1 /* GStreamer
2 * Copyright (C) 2007 David Schleef <ds@schleef.org>
3 * (C) 2008 Wim Taymans <wim.taymans@gmail.com>
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
14 *
15 * You should have received a copy of the GNU Library General Public
16 * License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
19 */
20 /**
21 * SECTION:gstappsrc
22 * @title: GstAppSrc
23 * @short_description: Easy way for applications to inject buffers into a
24 * pipeline
25 * @see_also: #GstBaseSrc, appsink
26 *
27 * The appsrc element can be used by applications to insert data into a
28 * GStreamer pipeline. Unlike most GStreamer elements, appsrc provides
29 * external API functions.
30 *
31 * appsrc can be used by linking with the libgstapp library to access the
32 * methods directly or by using the appsrc action signals.
33 *
34 * Before operating appsrc, the caps property must be set to fixed caps
35 * describing the format of the data that will be pushed with appsrc. An
36 * exception to this is when pushing buffers with unknown caps, in which case no
37 * caps should be set. This is typically true of file-like sources that push raw
38 * byte buffers. If you don't want to explicitly set the caps, you can use
39 * gst_app_src_push_sample. This method gets the caps associated with the
40 * sample and sets them on the appsrc replacing any previously set caps (if
41 * different from sample's caps).
42 *
43 * The main way of handing data to the appsrc element is by calling the
44 * gst_app_src_push_buffer() method or by emitting the push-buffer action signal.
45 * This will put the buffer onto a queue from which appsrc will read from in its
46 * streaming thread. It is important to note that data transport will not happen
47 * from the thread that performed the push-buffer call.
48 *
49 * The "max-bytes" property controls how much data can be queued in appsrc
50 * before appsrc considers the queue full. A filled internal queue will always
51 * signal the "enough-data" signal, which signals the application that it should
52 * stop pushing data into appsrc. The "block" property will cause appsrc to
53 * block the push-buffer method until free data becomes available again.
54 *
55 * When the internal queue is running out of data, the "need-data" signal is
56 * emitted, which signals the application that it should start pushing more data
57 * into appsrc.
58 *
59 * In addition to the "need-data" and "enough-data" signals, appsrc can emit the
60 * "seek-data" signal when the "stream-mode" property is set to "seekable" or
61 * "random-access". The signal argument will contain the new desired position in
62 * the stream expressed in the unit set with the "format" property. After
63 * receiving the seek-data signal, the application should push-buffers from the
64 * new position.
65 *
66 * These signals allow the application to operate the appsrc in two different
67 * ways:
68 *
69 * The push mode, in which the application repeatedly calls the push-buffer/push-sample
70 * method with a new buffer/sample. Optionally, the queue size in the appsrc
71 * can be controlled with the enough-data and need-data signals by respectively
72 * stopping/starting the push-buffer/push-sample calls. This is a typical
73 * mode of operation for the stream-type "stream" and "seekable". Use this
74 * mode when implementing various network protocols or hardware devices.
75 *
76 * The pull mode, in which the need-data signal triggers the next push-buffer call.
77 * This mode is typically used in the "random-access" stream-type. Use this
78 * mode for file access or other randomly accessable sources. In this mode, a
79 * buffer of exactly the amount of bytes given by the need-data signal should be
80 * pushed into appsrc.
81 *
82 * In all modes, the size property on appsrc should contain the total stream
83 * size in bytes. Setting this property is mandatory in the random-access mode.
84 * For the stream and seekable modes, setting this property is optional but
85 * recommended.
86 *
87 * When the application has finished pushing data into appsrc, it should call
88 * gst_app_src_end_of_stream() or emit the end-of-stream action signal. After
89 * this call, no more buffers can be pushed into appsrc until a flushing seek
90 * occurs or the state of the appsrc has gone through READY.
91 */
92
93 #ifdef HAVE_CONFIG_H
94 #include "config.h"
95 #endif
96
97 #include <gst/gst.h>
98 #include <gst/base/base.h>
99
100 #include <string.h>
101
102 #include "gstappsrc.h"
103
104 typedef enum
105 {
106 NOONE_WAITING = 0,
107 STREAM_WAITING = 1 << 0, /* streaming thread is waiting for application thread */
108 APP_WAITING = 1 << 1, /* application thread is waiting for streaming thread */
109 } GstAppSrcWaitStatus;
110
111 struct _GstAppSrcPrivate
112 {
113 GCond cond;
114 GMutex mutex;
115 GstQueueArray *queue;
116 GstAppSrcWaitStatus wait_status;
117
118 GstCaps *last_caps;
119 GstCaps *current_caps;
120
121 gint64 size;
122 GstClockTime duration;
123 GstAppStreamType stream_type;
124 guint64 max_bytes;
125 GstFormat format;
126 gboolean block;
127 gchar *uri;
128
129 gboolean flushing;
130 gboolean started;
131 gboolean is_eos;
132 guint64 queued_bytes;
133 guint64 offset;
134 GstAppStreamType current_type;
135
136 guint64 min_latency;
137 guint64 max_latency;
138 gboolean emit_signals;
139 guint min_percent;
140
141 GstAppSrcCallbacks callbacks;
142 gpointer user_data;
143 GDestroyNotify notify;
144 };
145
146 GST_DEBUG_CATEGORY_STATIC (app_src_debug);
147 #define GST_CAT_DEFAULT app_src_debug
148
149 enum
150 {
151 /* signals */
152 SIGNAL_NEED_DATA,
153 SIGNAL_ENOUGH_DATA,
154 SIGNAL_SEEK_DATA,
155
156 /* actions */
157 SIGNAL_PUSH_BUFFER,
158 SIGNAL_END_OF_STREAM,
159 SIGNAL_PUSH_SAMPLE,
160 SIGNAL_PUSH_BUFFER_LIST,
161
162 LAST_SIGNAL
163 };
164
165 #define DEFAULT_PROP_SIZE -1
166 #define DEFAULT_PROP_STREAM_TYPE GST_APP_STREAM_TYPE_STREAM
167 #define DEFAULT_PROP_MAX_BYTES 200000
168 #define DEFAULT_PROP_FORMAT GST_FORMAT_BYTES
169 #define DEFAULT_PROP_BLOCK FALSE
170 #define DEFAULT_PROP_IS_LIVE FALSE
171 #define DEFAULT_PROP_MIN_LATENCY -1
172 #define DEFAULT_PROP_MAX_LATENCY -1
173 #define DEFAULT_PROP_EMIT_SIGNALS TRUE
174 #define DEFAULT_PROP_MIN_PERCENT 0
175 #define DEFAULT_PROP_CURRENT_LEVEL_BYTES 0
176 #define DEFAULT_PROP_DURATION GST_CLOCK_TIME_NONE
177
178 enum
179 {
180 PROP_0,
181 PROP_CAPS,
182 PROP_SIZE,
183 PROP_STREAM_TYPE,
184 PROP_MAX_BYTES,
185 PROP_FORMAT,
186 PROP_BLOCK,
187 PROP_IS_LIVE,
188 PROP_MIN_LATENCY,
189 PROP_MAX_LATENCY,
190 PROP_EMIT_SIGNALS,
191 PROP_MIN_PERCENT,
192 PROP_CURRENT_LEVEL_BYTES,
193 PROP_DURATION,
194 PROP_LAST
195 };
196
197 static GstStaticPadTemplate gst_app_src_template =
198 GST_STATIC_PAD_TEMPLATE ("src",
199 GST_PAD_SRC,
200 GST_PAD_ALWAYS,
201 GST_STATIC_CAPS_ANY);
202
203 static void gst_app_src_uri_handler_init (gpointer g_iface,
204 gpointer iface_data);
205
206 static void gst_app_src_dispose (GObject * object);
207 static void gst_app_src_finalize (GObject * object);
208
209 static void gst_app_src_set_property (GObject * object, guint prop_id,
210 const GValue * value, GParamSpec * pspec);
211 static void gst_app_src_get_property (GObject * object, guint prop_id,
212 GValue * value, GParamSpec * pspec);
213
214 static gboolean gst_app_src_send_event (GstElement * element, GstEvent * event);
215
216 static void gst_app_src_set_latencies (GstAppSrc * appsrc,
217 gboolean do_min, guint64 min, gboolean do_max, guint64 max);
218
219 static gboolean gst_app_src_negotiate (GstBaseSrc * basesrc);
220 static GstCaps *gst_app_src_internal_get_caps (GstBaseSrc * bsrc,
221 GstCaps * filter);
222 static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc, guint64 offset,
223 guint size, GstBuffer ** buf);
224 static gboolean gst_app_src_start (GstBaseSrc * bsrc);
225 static gboolean gst_app_src_stop (GstBaseSrc * bsrc);
226 static gboolean gst_app_src_unlock (GstBaseSrc * bsrc);
227 static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc);
228 static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment);
229 static gboolean gst_app_src_is_seekable (GstBaseSrc * src);
230 static gboolean gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size);
231 static gboolean gst_app_src_query (GstBaseSrc * src, GstQuery * query);
232 static gboolean gst_app_src_event (GstBaseSrc * src, GstEvent * event);
233
234 static GstFlowReturn gst_app_src_push_buffer_action (GstAppSrc * appsrc,
235 GstBuffer * buffer);
236 static GstFlowReturn gst_app_src_push_buffer_list_action (GstAppSrc * appsrc,
237 GstBufferList * buffer_list);
238 static GstFlowReturn gst_app_src_push_sample_action (GstAppSrc * appsrc,
239 GstSample * sample);
240
241 static guint gst_app_src_signals[LAST_SIGNAL] = { 0 };
242
243 #define gst_app_src_parent_class parent_class
244 G_DEFINE_TYPE_WITH_CODE (GstAppSrc, gst_app_src, GST_TYPE_BASE_SRC,
245 G_ADD_PRIVATE (GstAppSrc)
246 G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_app_src_uri_handler_init));
247
248 static void
gst_app_src_class_init(GstAppSrcClass * klass)249 gst_app_src_class_init (GstAppSrcClass * klass)
250 {
251 GObjectClass *gobject_class = (GObjectClass *) klass;
252 GstElementClass *element_class = (GstElementClass *) klass;
253 GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass;
254
255 GST_DEBUG_CATEGORY_INIT (app_src_debug, "appsrc", 0, "appsrc element");
256
257 gobject_class->dispose = gst_app_src_dispose;
258 gobject_class->finalize = gst_app_src_finalize;
259
260 gobject_class->set_property = gst_app_src_set_property;
261 gobject_class->get_property = gst_app_src_get_property;
262
263 /**
264 * GstAppSrc::caps:
265 *
266 * The GstCaps that will negotiated downstream and will be put
267 * on outgoing buffers.
268 */
269 g_object_class_install_property (gobject_class, PROP_CAPS,
270 g_param_spec_boxed ("caps", "Caps",
271 "The allowed caps for the src pad", GST_TYPE_CAPS,
272 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
273 /**
274 * GstAppSrc::format:
275 *
276 * The format to use for segment events. When the source is producing
277 * timestamped buffers this property should be set to GST_FORMAT_TIME.
278 */
279 g_object_class_install_property (gobject_class, PROP_FORMAT,
280 g_param_spec_enum ("format", "Format",
281 "The format of the segment events and seek", GST_TYPE_FORMAT,
282 DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
283 /**
284 * GstAppSrc::size:
285 *
286 * The total size in bytes of the data stream. If the total size is known, it
287 * is recommended to configure it with this property.
288 */
289 g_object_class_install_property (gobject_class, PROP_SIZE,
290 g_param_spec_int64 ("size", "Size",
291 "The size of the data stream in bytes (-1 if unknown)",
292 -1, G_MAXINT64, DEFAULT_PROP_SIZE,
293 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
294 /**
295 * GstAppSrc::stream-type:
296 *
297 * The type of stream that this source is producing. For seekable streams the
298 * application should connect to the seek-data signal.
299 */
300 g_object_class_install_property (gobject_class, PROP_STREAM_TYPE,
301 g_param_spec_enum ("stream-type", "Stream Type",
302 "the type of the stream", GST_TYPE_APP_STREAM_TYPE,
303 DEFAULT_PROP_STREAM_TYPE,
304 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
305 /**
306 * GstAppSrc::max-bytes:
307 *
308 * The maximum amount of bytes that can be queued internally.
309 * After the maximum amount of bytes are queued, appsrc will emit the
310 * "enough-data" signal.
311 */
312 g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
313 g_param_spec_uint64 ("max-bytes", "Max bytes",
314 "The maximum number of bytes to queue internally (0 = unlimited)",
315 0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
316 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
317 /**
318 * GstAppSrc::block:
319 *
320 * When max-bytes are queued and after the enough-data signal has been emitted,
321 * block any further push-buffer calls until the amount of queued bytes drops
322 * below the max-bytes limit.
323 */
324 g_object_class_install_property (gobject_class, PROP_BLOCK,
325 g_param_spec_boolean ("block", "Block",
326 "Block push-buffer when max-bytes are queued",
327 DEFAULT_PROP_BLOCK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
328
329 /**
330 * GstAppSrc::is-live:
331 *
332 * Instruct the source to behave like a live source. This includes that it
333 * will only push out buffers in the PLAYING state.
334 */
335 g_object_class_install_property (gobject_class, PROP_IS_LIVE,
336 g_param_spec_boolean ("is-live", "Is Live",
337 "Whether to act as a live source",
338 DEFAULT_PROP_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
339 /**
340 * GstAppSrc::min-latency:
341 *
342 * The minimum latency of the source. A value of -1 will use the default
343 * latency calculations of #GstBaseSrc.
344 */
345 g_object_class_install_property (gobject_class, PROP_MIN_LATENCY,
346 g_param_spec_int64 ("min-latency", "Min Latency",
347 "The minimum latency (-1 = default)",
348 -1, G_MAXINT64, DEFAULT_PROP_MIN_LATENCY,
349 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
350 /**
351 * GstAppSrc::max-latency:
352 *
353 * The maximum latency of the source. A value of -1 means an unlimited amout
354 * of latency.
355 */
356 g_object_class_install_property (gobject_class, PROP_MAX_LATENCY,
357 g_param_spec_int64 ("max-latency", "Max Latency",
358 "The maximum latency (-1 = unlimited)",
359 -1, G_MAXINT64, DEFAULT_PROP_MAX_LATENCY,
360 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
361
362 /**
363 * GstAppSrc::emit-signals:
364 *
365 * Make appsrc emit the "need-data", "enough-data" and "seek-data" signals.
366 * This option is by default enabled for backwards compatibility reasons but
367 * can disabled when needed because signal emission is expensive.
368 */
369 g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
370 g_param_spec_boolean ("emit-signals", "Emit signals",
371 "Emit need-data, enough-data and seek-data signals",
372 DEFAULT_PROP_EMIT_SIGNALS,
373 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
374
375 /**
376 * GstAppSrc::min-percent:
377 *
378 * Make appsrc emit the "need-data" signal when the amount of bytes in the
379 * queue drops below this percentage of max-bytes.
380 */
381 g_object_class_install_property (gobject_class, PROP_MIN_PERCENT,
382 g_param_spec_uint ("min-percent", "Min Percent",
383 "Emit need-data when queued bytes drops below this percent of max-bytes",
384 0, 100, DEFAULT_PROP_MIN_PERCENT,
385 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
386
387 /**
388 * GstAppSrc::current-level-bytes:
389 *
390 * The number of currently queued bytes inside appsrc.
391 *
392 * Since: 1.2
393 */
394 g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BYTES,
395 g_param_spec_uint64 ("current-level-bytes", "Current Level Bytes",
396 "The number of currently queued bytes",
397 0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BYTES,
398 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
399
400 /**
401 * GstAppSrc::duration:
402 *
403 * The total duration in nanoseconds of the data stream. If the total duration is known, it
404 * is recommended to configure it with this property.
405 *
406 * Since: 1.10
407 */
408 g_object_class_install_property (gobject_class, PROP_DURATION,
409 g_param_spec_uint64 ("duration", "Duration",
410 "The duration of the data stream in nanoseconds (GST_CLOCK_TIME_NONE if unknown)",
411 0, G_MAXUINT64, DEFAULT_PROP_DURATION,
412 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
413
414 /**
415 * GstAppSrc::need-data:
416 * @appsrc: the appsrc element that emitted the signal
417 * @length: the amount of bytes needed.
418 *
419 * Signal that the source needs more data. In the callback or from another
420 * thread you should call push-buffer or end-of-stream.
421 *
422 * @length is just a hint and when it is set to -1, any number of bytes can be
423 * pushed into @appsrc.
424 *
425 * You can call push-buffer multiple times until the enough-data signal is
426 * fired.
427 */
428 gst_app_src_signals[SIGNAL_NEED_DATA] =
429 g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
430 G_STRUCT_OFFSET (GstAppSrcClass, need_data),
431 NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
432
433 /**
434 * GstAppSrc::enough-data:
435 * @appsrc: the appsrc element that emitted the signal
436 *
437 * Signal that the source has enough data. It is recommended that the
438 * application stops calling push-buffer until the need-data signal is
439 * emitted again to avoid excessive buffer queueing.
440 */
441 gst_app_src_signals[SIGNAL_ENOUGH_DATA] =
442 g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
443 G_STRUCT_OFFSET (GstAppSrcClass, enough_data),
444 NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
445
446 /**
447 * GstAppSrc::seek-data:
448 * @appsrc: the appsrc element that emitted the signal
449 * @offset: the offset to seek to
450 *
451 * Seek to the given offset. The next push-buffer should produce buffers from
452 * the new @offset.
453 * This callback is only called for seekable stream types.
454 *
455 * Returns: %TRUE if the seek succeeded.
456 */
457 gst_app_src_signals[SIGNAL_SEEK_DATA] =
458 g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
459 G_STRUCT_OFFSET (GstAppSrcClass, seek_data),
460 NULL, NULL, NULL, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
461
462 /**
463 * GstAppSrc::push-buffer:
464 * @appsrc: the appsrc
465 * @buffer: a buffer to push
466 *
467 * Adds a buffer to the queue of buffers that the appsrc element will
468 * push to its source pad. This function does not take ownership of the
469 * buffer so the buffer needs to be unreffed after calling this function.
470 *
471 * When the block property is TRUE, this function can block until free space
472 * becomes available in the queue.
473 */
474 gst_app_src_signals[SIGNAL_PUSH_BUFFER] =
475 g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass),
476 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
477 push_buffer), NULL, NULL, NULL,
478 GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);
479
480 /**
481 * GstAppSrc::push-buffer-list:
482 * @appsrc: the appsrc
483 * @buffer_list: a buffer list to push
484 *
485 * Adds a buffer list to the queue of buffers and buffer lists that the
486 * appsrc element will push to its source pad. This function does not take
487 * ownership of the buffer list so the buffer list needs to be unreffed
488 * after calling this function.
489 *
490 * When the block property is TRUE, this function can block until free space
491 * becomes available in the queue.
492 *
493 * Since: 1.14
494 */
495 gst_app_src_signals[SIGNAL_PUSH_BUFFER_LIST] =
496 g_signal_new ("push-buffer-list", G_TYPE_FROM_CLASS (klass),
497 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
498 push_buffer_list), NULL, NULL, NULL,
499 GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER_LIST);
500
501 /**
502 * GstAppSrc::push-sample:
503 * @appsrc: the appsrc
504 * @sample: a sample from which extract buffer to push
505 *
506 * Extract a buffer from the provided sample and adds the extracted buffer
507 * to the queue of buffers that the appsrc element will
508 * push to its source pad. This function set the appsrc caps based on the caps
509 * in the sample and reset the caps if they change.
510 * Only the caps and the buffer of the provided sample are used and not
511 * for example the segment in the sample.
512 * This function does not take ownership of the
513 * sample so the sample needs to be unreffed after calling this function.
514 *
515 * When the block property is TRUE, this function can block until free space
516 * becomes available in the queue.
517 *
518 * Since: 1.6
519 *
520 */
521 gst_app_src_signals[SIGNAL_PUSH_SAMPLE] =
522 g_signal_new ("push-sample", G_TYPE_FROM_CLASS (klass),
523 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
524 push_sample), NULL, NULL, NULL,
525 GST_TYPE_FLOW_RETURN, 1, GST_TYPE_SAMPLE);
526
527
528 /**
529 * GstAppSrc::end-of-stream:
530 * @appsrc: the appsrc
531 *
532 * Notify @appsrc that no more buffer are available.
533 */
534 gst_app_src_signals[SIGNAL_END_OF_STREAM] =
535 g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass),
536 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
537 end_of_stream), NULL, NULL, NULL,
538 GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);
539
540 gst_element_class_set_static_metadata (element_class, "AppSrc",
541 "Generic/Source", "Allow the application to feed buffers to a pipeline",
542 "David Schleef <ds@schleef.org>, Wim Taymans <wim.taymans@gmail.com>");
543
544 gst_element_class_add_static_pad_template (element_class,
545 &gst_app_src_template);
546
547 element_class->send_event = gst_app_src_send_event;
548
549 basesrc_class->negotiate = gst_app_src_negotiate;
550 basesrc_class->get_caps = gst_app_src_internal_get_caps;
551 basesrc_class->create = gst_app_src_create;
552 basesrc_class->start = gst_app_src_start;
553 basesrc_class->stop = gst_app_src_stop;
554 basesrc_class->unlock = gst_app_src_unlock;
555 basesrc_class->unlock_stop = gst_app_src_unlock_stop;
556 basesrc_class->do_seek = gst_app_src_do_seek;
557 basesrc_class->is_seekable = gst_app_src_is_seekable;
558 basesrc_class->get_size = gst_app_src_do_get_size;
559 basesrc_class->query = gst_app_src_query;
560 basesrc_class->event = gst_app_src_event;
561
562 klass->push_buffer = gst_app_src_push_buffer_action;
563 klass->push_buffer_list = gst_app_src_push_buffer_list_action;
564 klass->push_sample = gst_app_src_push_sample_action;
565 klass->end_of_stream = gst_app_src_end_of_stream;
566 }
567
568 static void
gst_app_src_init(GstAppSrc * appsrc)569 gst_app_src_init (GstAppSrc * appsrc)
570 {
571 GstAppSrcPrivate *priv;
572
573 priv = appsrc->priv = gst_app_src_get_instance_private (appsrc);
574
575 g_mutex_init (&priv->mutex);
576 g_cond_init (&priv->cond);
577 priv->queue = gst_queue_array_new (16);
578 priv->wait_status = NOONE_WAITING;
579
580 priv->size = DEFAULT_PROP_SIZE;
581 priv->duration = DEFAULT_PROP_DURATION;
582 priv->stream_type = DEFAULT_PROP_STREAM_TYPE;
583 priv->max_bytes = DEFAULT_PROP_MAX_BYTES;
584 priv->format = DEFAULT_PROP_FORMAT;
585 priv->block = DEFAULT_PROP_BLOCK;
586 priv->min_latency = DEFAULT_PROP_MIN_LATENCY;
587 priv->max_latency = DEFAULT_PROP_MAX_LATENCY;
588 priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
589 priv->min_percent = DEFAULT_PROP_MIN_PERCENT;
590
591 gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
592 }
593
594 /* Must be called with priv->mutex */
595 static void
gst_app_src_flush_queued(GstAppSrc * src,gboolean retain_last_caps)596 gst_app_src_flush_queued (GstAppSrc * src, gboolean retain_last_caps)
597 {
598 GstMiniObject *obj;
599 GstAppSrcPrivate *priv = src->priv;
600 GstCaps *requeue_caps = NULL;
601
602 while (!gst_queue_array_is_empty (priv->queue)) {
603 obj = gst_queue_array_pop_head (priv->queue);
604 if (obj) {
605 if (GST_IS_CAPS (obj) && retain_last_caps) {
606 gst_caps_replace (&requeue_caps, GST_CAPS_CAST (obj));
607 }
608 gst_mini_object_unref (obj);
609 }
610 }
611
612 if (requeue_caps) {
613 gst_queue_array_push_tail (priv->queue, requeue_caps);
614 }
615
616 priv->queued_bytes = 0;
617 }
618
619 static void
gst_app_src_dispose(GObject * obj)620 gst_app_src_dispose (GObject * obj)
621 {
622 GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
623 GstAppSrcPrivate *priv = appsrc->priv;
624
625 GST_OBJECT_LOCK (appsrc);
626 if (priv->current_caps) {
627 gst_caps_unref (priv->current_caps);
628 priv->current_caps = NULL;
629 }
630 if (priv->last_caps) {
631 gst_caps_unref (priv->last_caps);
632 priv->last_caps = NULL;
633 }
634 if (priv->notify) {
635 priv->notify (priv->user_data);
636 }
637 priv->user_data = NULL;
638 priv->notify = NULL;
639
640 GST_OBJECT_UNLOCK (appsrc);
641
642 g_mutex_lock (&priv->mutex);
643 gst_app_src_flush_queued (appsrc, FALSE);
644 g_mutex_unlock (&priv->mutex);
645
646 G_OBJECT_CLASS (parent_class)->dispose (obj);
647 }
648
649 static void
gst_app_src_finalize(GObject * obj)650 gst_app_src_finalize (GObject * obj)
651 {
652 GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
653 GstAppSrcPrivate *priv = appsrc->priv;
654
655 g_mutex_clear (&priv->mutex);
656 g_cond_clear (&priv->cond);
657 gst_queue_array_free (priv->queue);
658
659 g_free (priv->uri);
660
661 G_OBJECT_CLASS (parent_class)->finalize (obj);
662 }
663
664 static GstCaps *
gst_app_src_internal_get_caps(GstBaseSrc * bsrc,GstCaps * filter)665 gst_app_src_internal_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
666 {
667 GstAppSrc *appsrc = GST_APP_SRC (bsrc);
668 GstCaps *caps;
669
670 GST_OBJECT_LOCK (appsrc);
671 if ((caps = appsrc->priv->current_caps))
672 gst_caps_ref (caps);
673 GST_OBJECT_UNLOCK (appsrc);
674
675 if (filter) {
676 if (caps) {
677 GstCaps *intersection =
678 gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
679 gst_caps_unref (caps);
680 caps = intersection;
681 } else {
682 caps = gst_caps_ref (filter);
683 }
684 }
685
686 GST_DEBUG_OBJECT (appsrc, "caps: %" GST_PTR_FORMAT, caps);
687 return caps;
688 }
689
690 static void
gst_app_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)691 gst_app_src_set_property (GObject * object, guint prop_id,
692 const GValue * value, GParamSpec * pspec)
693 {
694 GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
695 GstAppSrcPrivate *priv = appsrc->priv;
696
697 switch (prop_id) {
698 case PROP_CAPS:
699 gst_app_src_set_caps (appsrc, gst_value_get_caps (value));
700 break;
701 case PROP_SIZE:
702 gst_app_src_set_size (appsrc, g_value_get_int64 (value));
703 break;
704 case PROP_STREAM_TYPE:
705 gst_app_src_set_stream_type (appsrc, g_value_get_enum (value));
706 break;
707 case PROP_MAX_BYTES:
708 gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
709 break;
710 case PROP_FORMAT:
711 priv->format = g_value_get_enum (value);
712 break;
713 case PROP_BLOCK:
714 priv->block = g_value_get_boolean (value);
715 break;
716 case PROP_IS_LIVE:
717 gst_base_src_set_live (GST_BASE_SRC (appsrc),
718 g_value_get_boolean (value));
719 break;
720 case PROP_MIN_LATENCY:
721 gst_app_src_set_latencies (appsrc, TRUE, g_value_get_int64 (value),
722 FALSE, -1);
723 break;
724 case PROP_MAX_LATENCY:
725 gst_app_src_set_latencies (appsrc, FALSE, -1, TRUE,
726 g_value_get_int64 (value));
727 break;
728 case PROP_EMIT_SIGNALS:
729 gst_app_src_set_emit_signals (appsrc, g_value_get_boolean (value));
730 break;
731 case PROP_MIN_PERCENT:
732 priv->min_percent = g_value_get_uint (value);
733 break;
734 case PROP_DURATION:
735 gst_app_src_set_duration (appsrc, g_value_get_uint64 (value));
736 break;
737 default:
738 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
739 break;
740 }
741 }
742
743 static void
gst_app_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)744 gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
745 GParamSpec * pspec)
746 {
747 GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
748 GstAppSrcPrivate *priv = appsrc->priv;
749
750 switch (prop_id) {
751 case PROP_CAPS:
752 g_value_take_boxed (value, gst_app_src_get_caps (appsrc));
753 break;
754 case PROP_SIZE:
755 g_value_set_int64 (value, gst_app_src_get_size (appsrc));
756 break;
757 case PROP_STREAM_TYPE:
758 g_value_set_enum (value, gst_app_src_get_stream_type (appsrc));
759 break;
760 case PROP_MAX_BYTES:
761 g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
762 break;
763 case PROP_FORMAT:
764 g_value_set_enum (value, priv->format);
765 break;
766 case PROP_BLOCK:
767 g_value_set_boolean (value, priv->block);
768 break;
769 case PROP_IS_LIVE:
770 g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (appsrc)));
771 break;
772 case PROP_MIN_LATENCY:
773 {
774 guint64 min = 0;
775
776 gst_app_src_get_latency (appsrc, &min, NULL);
777 g_value_set_int64 (value, min);
778 break;
779 }
780 case PROP_MAX_LATENCY:
781 {
782 guint64 max = 0;
783
784 gst_app_src_get_latency (appsrc, NULL, &max);
785 g_value_set_int64 (value, max);
786 break;
787 }
788 case PROP_EMIT_SIGNALS:
789 g_value_set_boolean (value, gst_app_src_get_emit_signals (appsrc));
790 break;
791 case PROP_MIN_PERCENT:
792 g_value_set_uint (value, priv->min_percent);
793 break;
794 case PROP_CURRENT_LEVEL_BYTES:
795 g_value_set_uint64 (value, gst_app_src_get_current_level_bytes (appsrc));
796 break;
797 case PROP_DURATION:
798 g_value_set_uint64 (value, gst_app_src_get_duration (appsrc));
799 break;
800 default:
801 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
802 break;
803 }
804 }
805
806 static gboolean
gst_app_src_send_event(GstElement * element,GstEvent * event)807 gst_app_src_send_event (GstElement * element, GstEvent * event)
808 {
809 GstAppSrc *appsrc = GST_APP_SRC_CAST (element);
810 GstAppSrcPrivate *priv = appsrc->priv;
811
812 switch (GST_EVENT_TYPE (event)) {
813 case GST_EVENT_FLUSH_STOP:
814 g_mutex_lock (&priv->mutex);
815 gst_app_src_flush_queued (appsrc, TRUE);
816 g_mutex_unlock (&priv->mutex);
817 break;
818 default:
819 break;
820 }
821
822 return GST_CALL_PARENT_WITH_DEFAULT (GST_ELEMENT_CLASS, send_event, (element,
823 event), FALSE);
824 }
825
826 static gboolean
gst_app_src_unlock(GstBaseSrc * bsrc)827 gst_app_src_unlock (GstBaseSrc * bsrc)
828 {
829 GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
830 GstAppSrcPrivate *priv = appsrc->priv;
831
832 g_mutex_lock (&priv->mutex);
833 GST_DEBUG_OBJECT (appsrc, "unlock start");
834 priv->flushing = TRUE;
835 g_cond_broadcast (&priv->cond);
836 g_mutex_unlock (&priv->mutex);
837
838 return TRUE;
839 }
840
841 static gboolean
gst_app_src_unlock_stop(GstBaseSrc * bsrc)842 gst_app_src_unlock_stop (GstBaseSrc * bsrc)
843 {
844 GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
845 GstAppSrcPrivate *priv = appsrc->priv;
846
847 g_mutex_lock (&priv->mutex);
848 GST_DEBUG_OBJECT (appsrc, "unlock stop");
849 priv->flushing = FALSE;
850 g_cond_broadcast (&priv->cond);
851 g_mutex_unlock (&priv->mutex);
852
853 return TRUE;
854 }
855
856 static gboolean
gst_app_src_start(GstBaseSrc * bsrc)857 gst_app_src_start (GstBaseSrc * bsrc)
858 {
859 GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
860 GstAppSrcPrivate *priv = appsrc->priv;
861
862 g_mutex_lock (&priv->mutex);
863 GST_DEBUG_OBJECT (appsrc, "starting");
864 priv->started = TRUE;
865 /* set the offset to -1 so that we always do a first seek. This is only used
866 * in random-access mode. */
867 priv->offset = -1;
868 priv->flushing = FALSE;
869 g_mutex_unlock (&priv->mutex);
870
871 gst_base_src_set_format (bsrc, priv->format);
872
873 return TRUE;
874 }
875
876 static gboolean
gst_app_src_stop(GstBaseSrc * bsrc)877 gst_app_src_stop (GstBaseSrc * bsrc)
878 {
879 GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
880 GstAppSrcPrivate *priv = appsrc->priv;
881
882 g_mutex_lock (&priv->mutex);
883 GST_DEBUG_OBJECT (appsrc, "stopping");
884 priv->is_eos = FALSE;
885 priv->flushing = TRUE;
886 priv->started = FALSE;
887 gst_app_src_flush_queued (appsrc, TRUE);
888 g_cond_broadcast (&priv->cond);
889 g_mutex_unlock (&priv->mutex);
890
891 return TRUE;
892 }
893
894 static gboolean
gst_app_src_is_seekable(GstBaseSrc * src)895 gst_app_src_is_seekable (GstBaseSrc * src)
896 {
897 GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
898 GstAppSrcPrivate *priv = appsrc->priv;
899 gboolean res = FALSE;
900
901 switch (priv->stream_type) {
902 case GST_APP_STREAM_TYPE_STREAM:
903 break;
904 case GST_APP_STREAM_TYPE_SEEKABLE:
905 case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
906 res = TRUE;
907 break;
908 }
909 return res;
910 }
911
912 static gboolean
gst_app_src_do_get_size(GstBaseSrc * src,guint64 * size)913 gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size)
914 {
915 GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
916
917 *size = gst_app_src_get_size (appsrc);
918
919 return TRUE;
920 }
921
922 static gboolean
gst_app_src_query(GstBaseSrc * src,GstQuery * query)923 gst_app_src_query (GstBaseSrc * src, GstQuery * query)
924 {
925 GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
926 GstAppSrcPrivate *priv = appsrc->priv;
927 gboolean res;
928
929 switch (GST_QUERY_TYPE (query)) {
930 case GST_QUERY_LATENCY:
931 {
932 GstClockTime min, max;
933 gboolean live;
934
935 /* Query the parent class for the defaults */
936 res = gst_base_src_query_latency (src, &live, &min, &max);
937
938 /* overwrite with our values when we need to */
939 g_mutex_lock (&priv->mutex);
940 if (priv->min_latency != -1) {
941 min = priv->min_latency;
942 max = priv->max_latency;
943 }
944 g_mutex_unlock (&priv->mutex);
945
946 gst_query_set_latency (query, live, min, max);
947 break;
948 }
949 case GST_QUERY_SCHEDULING:
950 {
951 gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
952 gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
953
954 switch (priv->stream_type) {
955 case GST_APP_STREAM_TYPE_STREAM:
956 case GST_APP_STREAM_TYPE_SEEKABLE:
957 break;
958 case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
959 gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
960 break;
961 }
962 res = TRUE;
963 break;
964 }
965 case GST_QUERY_DURATION:
966 {
967 GstFormat format;
968 gst_query_parse_duration (query, &format, NULL);
969 if (format == GST_FORMAT_BYTES) {
970 gst_query_set_duration (query, format, priv->size);
971 res = TRUE;
972 } else if (format == GST_FORMAT_TIME) {
973 if (priv->duration != GST_CLOCK_TIME_NONE) {
974 gst_query_set_duration (query, format, priv->duration);
975 res = TRUE;
976 } else {
977 res = FALSE;
978 }
979 } else {
980 res = FALSE;
981 }
982 break;
983 }
984 default:
985 res = GST_BASE_SRC_CLASS (parent_class)->query (src, query);
986 break;
987 }
988
989 return res;
990 }
991
992 /* will be called in push mode */
993 static gboolean
gst_app_src_do_seek(GstBaseSrc * src,GstSegment * segment)994 gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
995 {
996 GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
997 GstAppSrcPrivate *priv = appsrc->priv;
998 gint64 desired_position;
999 gboolean res = FALSE;
1000
1001 desired_position = segment->position;
1002
1003 /* no need to try to seek in streaming mode */
1004 if (priv->stream_type == GST_APP_STREAM_TYPE_STREAM)
1005 return TRUE;
1006
1007 GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
1008 desired_position, gst_format_get_name (segment->format));
1009
1010 if (priv->callbacks.seek_data)
1011 res = priv->callbacks.seek_data (appsrc, desired_position, priv->user_data);
1012 else {
1013 gboolean emit;
1014
1015 g_mutex_lock (&priv->mutex);
1016 emit = priv->emit_signals;
1017 g_mutex_unlock (&priv->mutex);
1018
1019 if (emit)
1020 g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
1021 desired_position, &res);
1022 }
1023
1024 if (res) {
1025 GST_DEBUG_OBJECT (appsrc, "flushing queue");
1026 g_mutex_lock (&priv->mutex);
1027 gst_app_src_flush_queued (appsrc, TRUE);
1028 g_mutex_unlock (&priv->mutex);
1029 priv->is_eos = FALSE;
1030 } else {
1031 GST_WARNING_OBJECT (appsrc, "seek failed");
1032 }
1033
1034 return res;
1035 }
1036
1037 /* must be called with the appsrc mutex */
1038 static gboolean
gst_app_src_emit_seek(GstAppSrc * appsrc,guint64 offset)1039 gst_app_src_emit_seek (GstAppSrc * appsrc, guint64 offset)
1040 {
1041 gboolean res = FALSE;
1042 gboolean emit;
1043 GstAppSrcPrivate *priv = appsrc->priv;
1044
1045 emit = priv->emit_signals;
1046 g_mutex_unlock (&priv->mutex);
1047
1048 GST_DEBUG_OBJECT (appsrc,
1049 "we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
1050 priv->offset, offset);
1051
1052 if (priv->callbacks.seek_data)
1053 res = priv->callbacks.seek_data (appsrc, offset, priv->user_data);
1054 else if (emit)
1055 g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
1056 offset, &res);
1057
1058 g_mutex_lock (&priv->mutex);
1059
1060 return res;
1061 }
1062
1063 /* must be called with the appsrc mutex. After this call things can be
1064 * flushing */
1065 static void
gst_app_src_emit_need_data(GstAppSrc * appsrc,guint size)1066 gst_app_src_emit_need_data (GstAppSrc * appsrc, guint size)
1067 {
1068 gboolean emit;
1069 GstAppSrcPrivate *priv = appsrc->priv;
1070
1071 emit = priv->emit_signals;
1072 g_mutex_unlock (&priv->mutex);
1073
1074 /* we have no data, we need some. We fire the signal with the size hint. */
1075 if (priv->callbacks.need_data)
1076 priv->callbacks.need_data (appsrc, size, priv->user_data);
1077 else if (emit)
1078 g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
1079 NULL);
1080
1081 g_mutex_lock (&priv->mutex);
1082 /* we can be flushing now because we released the lock */
1083 }
1084
1085 /* must be called with the appsrc mutex */
1086 static gboolean
gst_app_src_do_negotiate(GstBaseSrc * basesrc)1087 gst_app_src_do_negotiate (GstBaseSrc * basesrc)
1088 {
1089 GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc);
1090 GstAppSrcPrivate *priv = appsrc->priv;
1091 gboolean result;
1092 GstCaps *caps;
1093
1094 GST_OBJECT_LOCK (basesrc);
1095 caps = priv->current_caps ? gst_caps_ref (priv->current_caps) : NULL;
1096 GST_OBJECT_UNLOCK (basesrc);
1097
1098 /* Avoid deadlock by unlocking mutex
1099 * otherwise we get deadlock between this and stream lock */
1100 g_mutex_unlock (&priv->mutex);
1101 if (caps) {
1102 result = gst_base_src_set_caps (basesrc, caps);
1103 gst_caps_unref (caps);
1104 } else {
1105 result = GST_BASE_SRC_CLASS (parent_class)->negotiate (basesrc);
1106 }
1107 g_mutex_lock (&priv->mutex);
1108
1109 return result;
1110 }
1111
1112 static gboolean
gst_app_src_negotiate(GstBaseSrc * basesrc)1113 gst_app_src_negotiate (GstBaseSrc * basesrc)
1114 {
1115 GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc);
1116 GstAppSrcPrivate *priv = appsrc->priv;
1117 gboolean result;
1118
1119 g_mutex_lock (&priv->mutex);
1120 result = gst_app_src_do_negotiate (basesrc);
1121 g_mutex_unlock (&priv->mutex);
1122 return result;
1123 }
1124
1125 static GstFlowReturn
gst_app_src_create(GstBaseSrc * bsrc,guint64 offset,guint size,GstBuffer ** buf)1126 gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
1127 GstBuffer ** buf)
1128 {
1129 GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1130 GstAppSrcPrivate *priv = appsrc->priv;
1131 GstFlowReturn ret;
1132
1133 GST_OBJECT_LOCK (appsrc);
1134 if (G_UNLIKELY (priv->size != bsrc->segment.duration &&
1135 bsrc->segment.format == GST_FORMAT_BYTES)) {
1136 GST_DEBUG_OBJECT (appsrc,
1137 "Size changed from %" G_GINT64_FORMAT " to %" G_GINT64_FORMAT,
1138 bsrc->segment.duration, priv->size);
1139 bsrc->segment.duration = priv->size;
1140 GST_OBJECT_UNLOCK (appsrc);
1141
1142 gst_element_post_message (GST_ELEMENT (appsrc),
1143 gst_message_new_duration_changed (GST_OBJECT (appsrc)));
1144 } else if (G_UNLIKELY (priv->duration != bsrc->segment.duration &&
1145 bsrc->segment.format == GST_FORMAT_TIME)) {
1146 GST_DEBUG_OBJECT (appsrc,
1147 "Duration changed from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
1148 GST_TIME_ARGS (bsrc->segment.duration), GST_TIME_ARGS (priv->duration));
1149 bsrc->segment.duration = priv->duration;
1150 GST_OBJECT_UNLOCK (appsrc);
1151
1152 gst_element_post_message (GST_ELEMENT (appsrc),
1153 gst_message_new_duration_changed (GST_OBJECT (appsrc)));
1154 } else {
1155 GST_OBJECT_UNLOCK (appsrc);
1156 }
1157
1158 g_mutex_lock (&priv->mutex);
1159 /* check flushing first */
1160 if (G_UNLIKELY (priv->flushing))
1161 goto flushing;
1162
1163 if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
1164 /* if we are dealing with a random-access stream, issue a seek if the offset
1165 * changed. */
1166 if (G_UNLIKELY (priv->offset != offset)) {
1167 gboolean res;
1168
1169 /* do the seek */
1170 res = gst_app_src_emit_seek (appsrc, offset);
1171
1172 if (G_UNLIKELY (!res))
1173 /* failing to seek is fatal */
1174 goto seek_error;
1175
1176 priv->offset = offset;
1177 priv->is_eos = FALSE;
1178 }
1179 }
1180
1181 while (TRUE) {
1182 /* return data as long as we have some */
1183 if (!gst_queue_array_is_empty (priv->queue)) {
1184 guint buf_size;
1185 GstMiniObject *obj = gst_queue_array_pop_head (priv->queue);
1186
1187 if (GST_IS_CAPS (obj)) {
1188 GstCaps *next_caps = GST_CAPS (obj);
1189 gboolean caps_changed = TRUE;
1190
1191 if (next_caps && priv->current_caps)
1192 caps_changed = !gst_caps_is_equal (next_caps, priv->current_caps);
1193 else
1194 caps_changed = (next_caps != priv->current_caps);
1195
1196 gst_caps_replace (&priv->current_caps, next_caps);
1197
1198 if (next_caps) {
1199 gst_caps_unref (next_caps);
1200 }
1201
1202 if (caps_changed)
1203 gst_app_src_do_negotiate (bsrc);
1204
1205 /* Lock has released so now may need
1206 *- flushing
1207 *- new caps change
1208 *- check queue has data */
1209 if (G_UNLIKELY (priv->flushing))
1210 goto flushing;
1211
1212 /* Continue checks caps and queue */
1213 continue;
1214 }
1215
1216 if (GST_IS_BUFFER (obj)) {
1217 *buf = GST_BUFFER (obj);
1218 buf_size = gst_buffer_get_size (*buf);
1219 GST_LOG_OBJECT (appsrc, "have buffer %p of size %u", *buf, buf_size);
1220 } else {
1221 GstBufferList *buffer_list;
1222
1223 g_assert (GST_IS_BUFFER_LIST (obj));
1224
1225 buffer_list = GST_BUFFER_LIST (obj);
1226
1227 buf_size = gst_buffer_list_calculate_size (buffer_list);
1228
1229 GST_LOG_OBJECT (appsrc, "have buffer list %p of size %u, %u buffers",
1230 buffer_list, buf_size, gst_buffer_list_length (buffer_list));
1231
1232 gst_base_src_submit_buffer_list (bsrc, buffer_list);
1233 *buf = NULL;
1234 }
1235
1236 priv->queued_bytes -= buf_size;
1237
1238 /* only update the offset when in random_access mode */
1239 if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS)
1240 priv->offset += buf_size;
1241
1242 /* signal that we removed an item */
1243 if ((priv->wait_status & APP_WAITING))
1244 g_cond_broadcast (&priv->cond);
1245
1246 /* see if we go lower than the min-percent */
1247 if (priv->min_percent && priv->max_bytes) {
1248 if (priv->queued_bytes * 100 / priv->max_bytes <= priv->min_percent)
1249 /* ignore flushing state, we got a buffer and we will return it now.
1250 * Errors will be handled in the next round */
1251 gst_app_src_emit_need_data (appsrc, size);
1252 }
1253 ret = GST_FLOW_OK;
1254 break;
1255 } else {
1256 gst_app_src_emit_need_data (appsrc, size);
1257
1258 /* we can be flushing now because we released the lock above */
1259 if (G_UNLIKELY (priv->flushing))
1260 goto flushing;
1261
1262 /* if we have a buffer now, continue the loop and try to return it. In
1263 * random-access mode (where a buffer is normally pushed in the above
1264 * signal) we can still be empty because the pushed buffer got flushed or
1265 * when the application pushes the requested buffer later, we support both
1266 * possibilities. */
1267 if (!gst_queue_array_is_empty (priv->queue))
1268 continue;
1269
1270 /* no buffer yet, maybe we are EOS, if not, block for more data. */
1271 }
1272
1273 /* check EOS */
1274 if (G_UNLIKELY (priv->is_eos))
1275 goto eos;
1276
1277 /* nothing to return, wait a while for new data or flushing. */
1278 priv->wait_status |= STREAM_WAITING;
1279 g_cond_wait (&priv->cond, &priv->mutex);
1280 priv->wait_status &= ~STREAM_WAITING;
1281 }
1282 g_mutex_unlock (&priv->mutex);
1283 return ret;
1284
1285 /* ERRORS */
1286 flushing:
1287 {
1288 GST_DEBUG_OBJECT (appsrc, "we are flushing");
1289 g_mutex_unlock (&priv->mutex);
1290 return GST_FLOW_FLUSHING;
1291 }
1292 eos:
1293 {
1294 GST_DEBUG_OBJECT (appsrc, "we are EOS");
1295 g_mutex_unlock (&priv->mutex);
1296 return GST_FLOW_EOS;
1297 }
1298 seek_error:
1299 {
1300 g_mutex_unlock (&priv->mutex);
1301 GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"),
1302 GST_ERROR_SYSTEM);
1303 return GST_FLOW_ERROR;
1304 }
1305 }
1306
1307 /* external API */
1308
1309 /**
1310 * gst_app_src_set_caps:
1311 * @appsrc: a #GstAppSrc
1312 * @caps: caps to set
1313 *
1314 * Set the capabilities on the appsrc element. This function takes
1315 * a copy of the caps structure. After calling this method, the source will
1316 * only produce caps that match @caps. @caps must be fixed and the caps on the
1317 * buffers must match the caps or left NULL.
1318 */
1319 void
gst_app_src_set_caps(GstAppSrc * appsrc,const GstCaps * caps)1320 gst_app_src_set_caps (GstAppSrc * appsrc, const GstCaps * caps)
1321 {
1322 GstAppSrcPrivate *priv;
1323 gboolean caps_changed;
1324
1325 g_return_if_fail (GST_IS_APP_SRC (appsrc));
1326
1327 priv = appsrc->priv;
1328
1329 g_mutex_lock (&priv->mutex);
1330
1331 GST_OBJECT_LOCK (appsrc);
1332 if (caps && priv->last_caps)
1333 caps_changed = !gst_caps_is_equal (caps, priv->last_caps);
1334 else
1335 caps_changed = (caps != priv->last_caps);
1336
1337 if (caps_changed) {
1338 GstCaps *new_caps;
1339 gpointer t;
1340
1341 new_caps = caps ? gst_caps_copy (caps) : NULL;
1342 GST_DEBUG_OBJECT (appsrc, "setting caps to %" GST_PTR_FORMAT, caps);
1343
1344 while ((t = gst_queue_array_peek_tail (priv->queue)) && GST_IS_CAPS (t)) {
1345 gst_caps_unref (gst_queue_array_pop_tail (priv->queue));
1346 }
1347 gst_queue_array_push_tail (priv->queue, new_caps);
1348 gst_caps_replace (&priv->last_caps, new_caps);
1349 }
1350
1351 GST_OBJECT_UNLOCK (appsrc);
1352
1353 g_mutex_unlock (&priv->mutex);
1354 }
1355
1356 /**
1357 * gst_app_src_get_caps:
1358 * @appsrc: a #GstAppSrc
1359 *
1360 * Get the configured caps on @appsrc.
1361 *
1362 * Returns: the #GstCaps produced by the source. gst_caps_unref() after usage.
1363 */
1364 GstCaps *
gst_app_src_get_caps(GstAppSrc * appsrc)1365 gst_app_src_get_caps (GstAppSrc * appsrc)
1366 {
1367
1368 GstCaps *caps;
1369
1370 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), NULL);
1371
1372 GST_OBJECT_LOCK (appsrc);
1373 if ((caps = appsrc->priv->last_caps))
1374 gst_caps_ref (caps);
1375 GST_OBJECT_UNLOCK (appsrc);
1376
1377 return caps;
1378
1379 }
1380
1381 /**
1382 * gst_app_src_set_size:
1383 * @appsrc: a #GstAppSrc
1384 * @size: the size to set
1385 *
1386 * Set the size of the stream in bytes. A value of -1 means that the size is
1387 * not known.
1388 */
1389 void
gst_app_src_set_size(GstAppSrc * appsrc,gint64 size)1390 gst_app_src_set_size (GstAppSrc * appsrc, gint64 size)
1391 {
1392 GstAppSrcPrivate *priv;
1393
1394 g_return_if_fail (GST_IS_APP_SRC (appsrc));
1395
1396 priv = appsrc->priv;
1397
1398 GST_OBJECT_LOCK (appsrc);
1399 GST_DEBUG_OBJECT (appsrc, "setting size of %" G_GINT64_FORMAT, size);
1400 priv->size = size;
1401 GST_OBJECT_UNLOCK (appsrc);
1402 }
1403
1404 /**
1405 * gst_app_src_get_size:
1406 * @appsrc: a #GstAppSrc
1407 *
1408 * Get the size of the stream in bytes. A value of -1 means that the size is
1409 * not known.
1410 *
1411 * Returns: the size of the stream previously set with gst_app_src_set_size();
1412 */
1413 gint64
gst_app_src_get_size(GstAppSrc * appsrc)1414 gst_app_src_get_size (GstAppSrc * appsrc)
1415 {
1416 gint64 size;
1417 GstAppSrcPrivate *priv;
1418
1419 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
1420
1421 priv = appsrc->priv;
1422
1423 GST_OBJECT_LOCK (appsrc);
1424 size = priv->size;
1425 GST_DEBUG_OBJECT (appsrc, "getting size of %" G_GINT64_FORMAT, size);
1426 GST_OBJECT_UNLOCK (appsrc);
1427
1428 return size;
1429 }
1430
1431 /**
1432 * gst_app_src_set_duration:
1433 * @appsrc: a #GstAppSrc
1434 * @duration: the duration to set
1435 *
1436 * Set the duration of the stream in nanoseconds. A value of GST_CLOCK_TIME_NONE means that the duration is
1437 * not known.
1438 *
1439 * Since: 1.10
1440 */
1441 void
gst_app_src_set_duration(GstAppSrc * appsrc,GstClockTime duration)1442 gst_app_src_set_duration (GstAppSrc * appsrc, GstClockTime duration)
1443 {
1444 GstAppSrcPrivate *priv;
1445
1446 g_return_if_fail (GST_IS_APP_SRC (appsrc));
1447
1448 priv = appsrc->priv;
1449
1450 GST_OBJECT_LOCK (appsrc);
1451 GST_DEBUG_OBJECT (appsrc, "setting duration of %" GST_TIME_FORMAT,
1452 GST_TIME_ARGS (duration));
1453 priv->duration = duration;
1454 GST_OBJECT_UNLOCK (appsrc);
1455 }
1456
1457 /**
1458 * gst_app_src_get_duration:
1459 * @appsrc: a #GstAppSrc
1460 *
1461 * Get the duration of the stream in nanoseconds. A value of GST_CLOCK_TIME_NONE means that the duration is
1462 * not known.
1463 *
1464 * Returns: the duration of the stream previously set with gst_app_src_set_duration();
1465 *
1466 * Since: 1.10
1467 */
1468 GstClockTime
gst_app_src_get_duration(GstAppSrc * appsrc)1469 gst_app_src_get_duration (GstAppSrc * appsrc)
1470 {
1471 GstClockTime duration;
1472 GstAppSrcPrivate *priv;
1473
1474 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_CLOCK_TIME_NONE);
1475
1476 priv = appsrc->priv;
1477
1478 GST_OBJECT_LOCK (appsrc);
1479 duration = priv->duration;
1480 GST_DEBUG_OBJECT (appsrc, "getting duration of %" GST_TIME_FORMAT,
1481 GST_TIME_ARGS (duration));
1482 GST_OBJECT_UNLOCK (appsrc);
1483
1484 return duration;
1485 }
1486
1487 /**
1488 * gst_app_src_set_stream_type:
1489 * @appsrc: a #GstAppSrc
1490 * @type: the new state
1491 *
1492 * Set the stream type on @appsrc. For seekable streams, the "seek" signal must
1493 * be connected to.
1494 *
1495 * A stream_type stream
1496 */
1497 void
gst_app_src_set_stream_type(GstAppSrc * appsrc,GstAppStreamType type)1498 gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type)
1499 {
1500 GstAppSrcPrivate *priv;
1501
1502 g_return_if_fail (GST_IS_APP_SRC (appsrc));
1503
1504 priv = appsrc->priv;
1505
1506 GST_OBJECT_LOCK (appsrc);
1507 GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type);
1508 priv->stream_type = type;
1509 GST_OBJECT_UNLOCK (appsrc);
1510 }
1511
1512 /**
1513 * gst_app_src_get_stream_type:
1514 * @appsrc: a #GstAppSrc
1515 *
1516 * Get the stream type. Control the stream type of @appsrc
1517 * with gst_app_src_set_stream_type().
1518 *
1519 * Returns: the stream type.
1520 */
1521 GstAppStreamType
gst_app_src_get_stream_type(GstAppSrc * appsrc)1522 gst_app_src_get_stream_type (GstAppSrc * appsrc)
1523 {
1524 gboolean stream_type;
1525 GstAppSrcPrivate *priv;
1526
1527 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
1528
1529 priv = appsrc->priv;
1530
1531 GST_OBJECT_LOCK (appsrc);
1532 stream_type = priv->stream_type;
1533 GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type);
1534 GST_OBJECT_UNLOCK (appsrc);
1535
1536 return stream_type;
1537 }
1538
1539 /**
1540 * gst_app_src_set_max_bytes:
1541 * @appsrc: a #GstAppSrc
1542 * @max: the maximum number of bytes to queue
1543 *
1544 * Set the maximum amount of bytes that can be queued in @appsrc.
1545 * After the maximum amount of bytes are queued, @appsrc will emit the
1546 * "enough-data" signal.
1547 */
1548 void
gst_app_src_set_max_bytes(GstAppSrc * appsrc,guint64 max)1549 gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max)
1550 {
1551 GstAppSrcPrivate *priv;
1552
1553 g_return_if_fail (GST_IS_APP_SRC (appsrc));
1554
1555 priv = appsrc->priv;
1556
1557 g_mutex_lock (&priv->mutex);
1558 if (max != priv->max_bytes) {
1559 GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %" G_GUINT64_FORMAT, max);
1560 priv->max_bytes = max;
1561 /* signal the change */
1562 g_cond_broadcast (&priv->cond);
1563 }
1564 g_mutex_unlock (&priv->mutex);
1565 }
1566
1567 /**
1568 * gst_app_src_get_max_bytes:
1569 * @appsrc: a #GstAppSrc
1570 *
1571 * Get the maximum amount of bytes that can be queued in @appsrc.
1572 *
1573 * Returns: The maximum amount of bytes that can be queued.
1574 */
1575 guint64
gst_app_src_get_max_bytes(GstAppSrc * appsrc)1576 gst_app_src_get_max_bytes (GstAppSrc * appsrc)
1577 {
1578 guint64 result;
1579 GstAppSrcPrivate *priv;
1580
1581 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
1582
1583 priv = appsrc->priv;
1584
1585 g_mutex_lock (&priv->mutex);
1586 result = priv->max_bytes;
1587 GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %" G_GUINT64_FORMAT, result);
1588 g_mutex_unlock (&priv->mutex);
1589
1590 return result;
1591 }
1592
1593 /**
1594 * gst_app_src_get_current_level_bytes:
1595 * @appsrc: a #GstAppSrc
1596 *
1597 * Get the number of currently queued bytes inside @appsrc.
1598 *
1599 * Returns: The number of currently queued bytes.
1600 *
1601 * Since: 1.2
1602 */
1603 guint64
gst_app_src_get_current_level_bytes(GstAppSrc * appsrc)1604 gst_app_src_get_current_level_bytes (GstAppSrc * appsrc)
1605 {
1606 gint64 queued;
1607 GstAppSrcPrivate *priv;
1608
1609 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
1610
1611 priv = appsrc->priv;
1612
1613 GST_OBJECT_LOCK (appsrc);
1614 queued = priv->queued_bytes;
1615 GST_DEBUG_OBJECT (appsrc, "current level bytes is %" G_GUINT64_FORMAT,
1616 queued);
1617 GST_OBJECT_UNLOCK (appsrc);
1618
1619 return queued;
1620 }
1621
1622 static void
gst_app_src_set_latencies(GstAppSrc * appsrc,gboolean do_min,guint64 min,gboolean do_max,guint64 max)1623 gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
1624 gboolean do_max, guint64 max)
1625 {
1626 GstAppSrcPrivate *priv = appsrc->priv;
1627 gboolean changed = FALSE;
1628
1629 g_mutex_lock (&priv->mutex);
1630 if (do_min && priv->min_latency != min) {
1631 priv->min_latency = min;
1632 changed = TRUE;
1633 }
1634 if (do_max && priv->max_latency != max) {
1635 priv->max_latency = max;
1636 changed = TRUE;
1637 }
1638 g_mutex_unlock (&priv->mutex);
1639
1640 if (changed) {
1641 GST_DEBUG_OBJECT (appsrc, "posting latency changed");
1642 gst_element_post_message (GST_ELEMENT_CAST (appsrc),
1643 gst_message_new_latency (GST_OBJECT_CAST (appsrc)));
1644 }
1645 }
1646
1647 /**
1648 * gst_app_src_set_latency:
1649 * @appsrc: a #GstAppSrc
1650 * @min: the min latency
1651 * @max: the max latency
1652 *
1653 * Configure the @min and @max latency in @src. If @min is set to -1, the
1654 * default latency calculations for pseudo-live sources will be used.
1655 */
1656 void
gst_app_src_set_latency(GstAppSrc * appsrc,guint64 min,guint64 max)1657 gst_app_src_set_latency (GstAppSrc * appsrc, guint64 min, guint64 max)
1658 {
1659 gst_app_src_set_latencies (appsrc, TRUE, min, TRUE, max);
1660 }
1661
1662 /**
1663 * gst_app_src_get_latency:
1664 * @appsrc: a #GstAppSrc
1665 * @min: (out): the min latency
1666 * @max: (out): the max latency
1667 *
1668 * Retrieve the min and max latencies in @min and @max respectively.
1669 */
1670 void
gst_app_src_get_latency(GstAppSrc * appsrc,guint64 * min,guint64 * max)1671 gst_app_src_get_latency (GstAppSrc * appsrc, guint64 * min, guint64 * max)
1672 {
1673 GstAppSrcPrivate *priv;
1674
1675 g_return_if_fail (GST_IS_APP_SRC (appsrc));
1676
1677 priv = appsrc->priv;
1678
1679 g_mutex_lock (&priv->mutex);
1680 if (min)
1681 *min = priv->min_latency;
1682 if (max)
1683 *max = priv->max_latency;
1684 g_mutex_unlock (&priv->mutex);
1685 }
1686
1687 /**
1688 * gst_app_src_set_emit_signals:
1689 * @appsrc: a #GstAppSrc
1690 * @emit: the new state
1691 *
1692 * Make appsrc emit the "new-preroll" and "new-buffer" signals. This option is
1693 * by default disabled because signal emission is expensive and unneeded when
1694 * the application prefers to operate in pull mode.
1695 */
1696 void
gst_app_src_set_emit_signals(GstAppSrc * appsrc,gboolean emit)1697 gst_app_src_set_emit_signals (GstAppSrc * appsrc, gboolean emit)
1698 {
1699 GstAppSrcPrivate *priv;
1700
1701 g_return_if_fail (GST_IS_APP_SRC (appsrc));
1702
1703 priv = appsrc->priv;
1704
1705 g_mutex_lock (&priv->mutex);
1706 priv->emit_signals = emit;
1707 g_mutex_unlock (&priv->mutex);
1708 }
1709
1710 /**
1711 * gst_app_src_get_emit_signals:
1712 * @appsrc: a #GstAppSrc
1713 *
1714 * Check if appsrc will emit the "new-preroll" and "new-buffer" signals.
1715 *
1716 * Returns: %TRUE if @appsrc is emitting the "new-preroll" and "new-buffer"
1717 * signals.
1718 */
1719 gboolean
gst_app_src_get_emit_signals(GstAppSrc * appsrc)1720 gst_app_src_get_emit_signals (GstAppSrc * appsrc)
1721 {
1722 gboolean result;
1723 GstAppSrcPrivate *priv;
1724
1725 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
1726
1727 priv = appsrc->priv;
1728
1729 g_mutex_lock (&priv->mutex);
1730 result = priv->emit_signals;
1731 g_mutex_unlock (&priv->mutex);
1732
1733 return result;
1734 }
1735
1736 static GstFlowReturn
gst_app_src_push_internal(GstAppSrc * appsrc,GstBuffer * buffer,GstBufferList * buflist,gboolean steal_ref)1737 gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
1738 GstBufferList * buflist, gboolean steal_ref)
1739 {
1740 gboolean first = TRUE;
1741 GstAppSrcPrivate *priv;
1742
1743 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
1744
1745 priv = appsrc->priv;
1746
1747 if (buffer != NULL)
1748 g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1749 else
1750 g_return_val_if_fail (GST_IS_BUFFER_LIST (buflist), GST_FLOW_ERROR);
1751
1752 if (buflist != NULL) {
1753 if (gst_buffer_list_length (buflist) == 0)
1754 return GST_FLOW_OK;
1755
1756 buffer = gst_buffer_list_get (buflist, 0);
1757 }
1758
1759 if (GST_BUFFER_DTS (buffer) == GST_CLOCK_TIME_NONE &&
1760 GST_BUFFER_PTS (buffer) == GST_CLOCK_TIME_NONE &&
1761 gst_base_src_get_do_timestamp (GST_BASE_SRC_CAST (appsrc))) {
1762 GstClock *clock;
1763
1764 clock = gst_element_get_clock (GST_ELEMENT_CAST (appsrc));
1765 if (clock) {
1766 GstClockTime now;
1767 GstClockTime base_time =
1768 gst_element_get_base_time (GST_ELEMENT_CAST (appsrc));
1769
1770 now = gst_clock_get_time (clock);
1771 if (now > base_time)
1772 now -= base_time;
1773 else
1774 now = 0;
1775 gst_object_unref (clock);
1776
1777 if (buflist == NULL) {
1778 if (!steal_ref) {
1779 buffer = gst_buffer_copy (buffer);
1780 steal_ref = TRUE;
1781 } else {
1782 buffer = gst_buffer_make_writable (buffer);
1783 }
1784 } else {
1785 if (!steal_ref) {
1786 buflist = gst_buffer_list_copy (buflist);
1787 steal_ref = TRUE;
1788 } else {
1789 buflist = gst_buffer_list_make_writable (buflist);
1790 }
1791 buffer = gst_buffer_list_get_writable (buflist, 0);
1792 }
1793
1794 GST_BUFFER_PTS (buffer) = now;
1795 GST_BUFFER_DTS (buffer) = now;
1796 } else {
1797 GST_WARNING_OBJECT (appsrc,
1798 "do-timestamp=TRUE but buffers are provided before "
1799 "reaching the PLAYING state and having a clock. Timestamps will "
1800 "not be accurate!");
1801 }
1802 }
1803
1804 g_mutex_lock (&priv->mutex);
1805
1806 while (TRUE) {
1807 /* can't accept buffers when we are flushing or EOS */
1808 if (priv->flushing)
1809 goto flushing;
1810
1811 if (priv->is_eos)
1812 goto eos;
1813
1814 if (priv->max_bytes && priv->queued_bytes >= priv->max_bytes) {
1815 GST_DEBUG_OBJECT (appsrc,
1816 "queue filled (%" G_GUINT64_FORMAT " >= %" G_GUINT64_FORMAT ")",
1817 priv->queued_bytes, priv->max_bytes);
1818
1819 if (first) {
1820 gboolean emit;
1821
1822 emit = priv->emit_signals;
1823 /* only signal on the first push */
1824 g_mutex_unlock (&priv->mutex);
1825
1826 if (priv->callbacks.enough_data)
1827 priv->callbacks.enough_data (appsrc, priv->user_data);
1828 else if (emit)
1829 g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
1830 NULL);
1831
1832 g_mutex_lock (&priv->mutex);
1833 /* continue to check for flushing/eos after releasing the lock */
1834 first = FALSE;
1835 continue;
1836 }
1837 if (priv->block) {
1838 GST_DEBUG_OBJECT (appsrc, "waiting for free space");
1839 /* we are filled, wait until a buffer gets popped or when we
1840 * flush. */
1841 priv->wait_status |= APP_WAITING;
1842 g_cond_wait (&priv->cond, &priv->mutex);
1843 priv->wait_status &= ~APP_WAITING;
1844 } else {
1845 /* no need to wait for free space, we just pump more data into the
1846 * queue hoping that the caller reacts to the enough-data signal and
1847 * stops pushing buffers. */
1848 break;
1849 }
1850 } else
1851 break;
1852 }
1853
1854 if (buflist != NULL) {
1855 GST_DEBUG_OBJECT (appsrc, "queueing buffer list %p", buflist);
1856 if (!steal_ref)
1857 gst_buffer_list_ref (buflist);
1858 gst_queue_array_push_tail (priv->queue, buflist);
1859 priv->queued_bytes += gst_buffer_list_calculate_size (buflist);
1860 } else {
1861 GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
1862 if (!steal_ref)
1863 gst_buffer_ref (buffer);
1864 gst_queue_array_push_tail (priv->queue, buffer);
1865 priv->queued_bytes += gst_buffer_get_size (buffer);
1866 }
1867
1868 if ((priv->wait_status & STREAM_WAITING))
1869 g_cond_broadcast (&priv->cond);
1870
1871 g_mutex_unlock (&priv->mutex);
1872
1873 return GST_FLOW_OK;
1874
1875 /* ERRORS */
1876 flushing:
1877 {
1878 GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer);
1879 if (steal_ref)
1880 gst_buffer_unref (buffer);
1881 g_mutex_unlock (&priv->mutex);
1882 return GST_FLOW_FLUSHING;
1883 }
1884 eos:
1885 {
1886 GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer);
1887 if (steal_ref)
1888 gst_buffer_unref (buffer);
1889 g_mutex_unlock (&priv->mutex);
1890 return GST_FLOW_EOS;
1891 }
1892 }
1893
1894 static GstFlowReturn
gst_app_src_push_buffer_full(GstAppSrc * appsrc,GstBuffer * buffer,gboolean steal_ref)1895 gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
1896 gboolean steal_ref)
1897 {
1898 return gst_app_src_push_internal (appsrc, buffer, NULL, steal_ref);
1899 }
1900
1901 static GstFlowReturn
gst_app_src_push_sample_internal(GstAppSrc * appsrc,GstSample * sample)1902 gst_app_src_push_sample_internal (GstAppSrc * appsrc, GstSample * sample)
1903 {
1904 GstBufferList *buffer_list;
1905 GstBuffer *buffer;
1906 GstCaps *caps;
1907
1908 g_return_val_if_fail (GST_IS_SAMPLE (sample), GST_FLOW_ERROR);
1909
1910 caps = gst_sample_get_caps (sample);
1911 if (caps != NULL) {
1912 gst_app_src_set_caps (appsrc, caps);
1913 } else {
1914 GST_WARNING_OBJECT (appsrc, "received sample without caps");
1915 }
1916
1917 buffer = gst_sample_get_buffer (sample);
1918 if (buffer != NULL)
1919 return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
1920
1921 buffer_list = gst_sample_get_buffer_list (sample);
1922 if (buffer_list != NULL)
1923 return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE);
1924
1925 GST_WARNING_OBJECT (appsrc, "received sample without buffer or buffer list");
1926 return GST_FLOW_OK;
1927 }
1928
1929 /**
1930 * gst_app_src_push_buffer:
1931 * @appsrc: a #GstAppSrc
1932 * @buffer: (transfer full): a #GstBuffer to push
1933 *
1934 * Adds a buffer to the queue of buffers that the appsrc element will
1935 * push to its source pad. This function takes ownership of the buffer.
1936 *
1937 * When the block property is TRUE, this function can block until free
1938 * space becomes available in the queue.
1939 *
1940 * Returns: #GST_FLOW_OK when the buffer was successfuly queued.
1941 * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
1942 * #GST_FLOW_EOS when EOS occured.
1943 */
1944 GstFlowReturn
gst_app_src_push_buffer(GstAppSrc * appsrc,GstBuffer * buffer)1945 gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
1946 {
1947 return gst_app_src_push_buffer_full (appsrc, buffer, TRUE);
1948 }
1949
1950 /**
1951 * gst_app_src_push_buffer_list:
1952 * @appsrc: a #GstAppSrc
1953 * @buffer_list: (transfer full): a #GstBufferList to push
1954 *
1955 * Adds a buffer list to the queue of buffers and buffer lists that the
1956 * appsrc element will push to its source pad. This function takes ownership
1957 * of @buffer_list.
1958 *
1959 * When the block property is TRUE, this function can block until free
1960 * space becomes available in the queue.
1961 *
1962 * Returns: #GST_FLOW_OK when the buffer list was successfuly queued.
1963 * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
1964 * #GST_FLOW_EOS when EOS occured.
1965 *
1966 * Since: 1.14
1967 */
1968 GstFlowReturn
gst_app_src_push_buffer_list(GstAppSrc * appsrc,GstBufferList * buffer_list)1969 gst_app_src_push_buffer_list (GstAppSrc * appsrc, GstBufferList * buffer_list)
1970 {
1971 return gst_app_src_push_internal (appsrc, NULL, buffer_list, TRUE);
1972 }
1973
1974 /**
1975 * gst_app_src_push_sample:
1976 * @appsrc: a #GstAppSrc
1977 * @sample: (transfer none): a #GstSample from which buffer and caps may be
1978 * extracted
1979 *
1980 * Extract a buffer from the provided sample and adds it to the queue of
1981 * buffers that the appsrc element will push to its source pad. Any
1982 * previous caps that were set on appsrc will be replaced by the caps
1983 * associated with the sample if not equal.
1984 *
1985 * This function does not take ownership of the
1986 * sample so the sample needs to be unreffed after calling this function.
1987 *
1988 * When the block property is TRUE, this function can block until free
1989 * space becomes available in the queue.
1990 *
1991 * Returns: #GST_FLOW_OK when the buffer was successfuly queued.
1992 * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
1993 * #GST_FLOW_EOS when EOS occured.
1994 *
1995 * Since: 1.6
1996 *
1997 */
1998 GstFlowReturn
gst_app_src_push_sample(GstAppSrc * appsrc,GstSample * sample)1999 gst_app_src_push_sample (GstAppSrc * appsrc, GstSample * sample)
2000 {
2001 return gst_app_src_push_sample_internal (appsrc, sample);
2002 }
2003
2004 /* push a buffer without stealing the ref of the buffer. This is used for the
2005 * action signal. */
2006 static GstFlowReturn
gst_app_src_push_buffer_action(GstAppSrc * appsrc,GstBuffer * buffer)2007 gst_app_src_push_buffer_action (GstAppSrc * appsrc, GstBuffer * buffer)
2008 {
2009 return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
2010 }
2011
2012 /* push a buffer list without stealing the ref of the buffer list. This is
2013 * used for the action signal. */
2014 static GstFlowReturn
gst_app_src_push_buffer_list_action(GstAppSrc * appsrc,GstBufferList * buffer_list)2015 gst_app_src_push_buffer_list_action (GstAppSrc * appsrc,
2016 GstBufferList * buffer_list)
2017 {
2018 return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE);
2019 }
2020
2021 /* push a sample without stealing the ref. This is used for the
2022 * action signal. */
2023 static GstFlowReturn
gst_app_src_push_sample_action(GstAppSrc * appsrc,GstSample * sample)2024 gst_app_src_push_sample_action (GstAppSrc * appsrc, GstSample * sample)
2025 {
2026 return gst_app_src_push_sample_internal (appsrc, sample);
2027 }
2028
2029 /**
2030 * gst_app_src_end_of_stream:
2031 * @appsrc: a #GstAppSrc
2032 *
2033 * Indicates to the appsrc element that the last buffer queued in the
2034 * element is the last buffer of the stream.
2035 *
2036 * Returns: #GST_FLOW_OK when the EOS was successfuly queued.
2037 * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2038 */
2039 GstFlowReturn
gst_app_src_end_of_stream(GstAppSrc * appsrc)2040 gst_app_src_end_of_stream (GstAppSrc * appsrc)
2041 {
2042 GstAppSrcPrivate *priv;
2043
2044 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
2045
2046 priv = appsrc->priv;
2047
2048 g_mutex_lock (&priv->mutex);
2049 /* can't accept buffers when we are flushing. We can accept them when we are
2050 * EOS although it will not do anything. */
2051 if (priv->flushing)
2052 goto flushing;
2053
2054 GST_DEBUG_OBJECT (appsrc, "sending EOS");
2055 priv->is_eos = TRUE;
2056 g_cond_broadcast (&priv->cond);
2057 g_mutex_unlock (&priv->mutex);
2058
2059 return GST_FLOW_OK;
2060
2061 /* ERRORS */
2062 flushing:
2063 {
2064 g_mutex_unlock (&priv->mutex);
2065 GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing");
2066 return GST_FLOW_FLUSHING;
2067 }
2068 }
2069
2070 /**
2071 * gst_app_src_set_callbacks: (skip)
2072 * @appsrc: a #GstAppSrc
2073 * @callbacks: the callbacks
2074 * @user_data: a user_data argument for the callbacks
2075 * @notify: a destroy notify function
2076 *
2077 * Set callbacks which will be executed when data is needed, enough data has
2078 * been collected or when a seek should be performed.
2079 * This is an alternative to using the signals, it has lower overhead and is thus
2080 * less expensive, but also less flexible.
2081 *
2082 * If callbacks are installed, no signals will be emitted for performance
2083 * reasons.
2084 */
2085 void
gst_app_src_set_callbacks(GstAppSrc * appsrc,GstAppSrcCallbacks * callbacks,gpointer user_data,GDestroyNotify notify)2086 gst_app_src_set_callbacks (GstAppSrc * appsrc,
2087 GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
2088 {
2089 GDestroyNotify old_notify;
2090 GstAppSrcPrivate *priv;
2091
2092 g_return_if_fail (GST_IS_APP_SRC (appsrc));
2093 g_return_if_fail (callbacks != NULL);
2094
2095 priv = appsrc->priv;
2096
2097 GST_OBJECT_LOCK (appsrc);
2098 old_notify = priv->notify;
2099
2100 if (old_notify) {
2101 gpointer old_data;
2102
2103 old_data = priv->user_data;
2104
2105 priv->user_data = NULL;
2106 priv->notify = NULL;
2107 GST_OBJECT_UNLOCK (appsrc);
2108
2109 old_notify (old_data);
2110
2111 GST_OBJECT_LOCK (appsrc);
2112 }
2113 priv->callbacks = *callbacks;
2114 priv->user_data = user_data;
2115 priv->notify = notify;
2116 GST_OBJECT_UNLOCK (appsrc);
2117 }
2118
2119 /*** GSTURIHANDLER INTERFACE *************************************************/
2120
2121 static GstURIType
gst_app_src_uri_get_type(GType type)2122 gst_app_src_uri_get_type (GType type)
2123 {
2124 return GST_URI_SRC;
2125 }
2126
2127 static const gchar *const *
gst_app_src_uri_get_protocols(GType type)2128 gst_app_src_uri_get_protocols (GType type)
2129 {
2130 static const gchar *protocols[] = { "appsrc", NULL };
2131
2132 return protocols;
2133 }
2134
2135 static gchar *
gst_app_src_uri_get_uri(GstURIHandler * handler)2136 gst_app_src_uri_get_uri (GstURIHandler * handler)
2137 {
2138 GstAppSrc *appsrc = GST_APP_SRC (handler);
2139
2140 return appsrc->priv->uri ? g_strdup (appsrc->priv->uri) : NULL;
2141 }
2142
2143 static gboolean
gst_app_src_uri_set_uri(GstURIHandler * handler,const gchar * uri,GError ** error)2144 gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
2145 GError ** error)
2146 {
2147 GstAppSrc *appsrc = GST_APP_SRC (handler);
2148
2149 g_free (appsrc->priv->uri);
2150 appsrc->priv->uri = uri ? g_strdup (uri) : NULL;
2151
2152 return TRUE;
2153 }
2154
2155 static void
gst_app_src_uri_handler_init(gpointer g_iface,gpointer iface_data)2156 gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
2157 {
2158 GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
2159
2160 iface->get_type = gst_app_src_uri_get_type;
2161 iface->get_protocols = gst_app_src_uri_get_protocols;
2162 iface->get_uri = gst_app_src_uri_get_uri;
2163 iface->set_uri = gst_app_src_uri_set_uri;
2164 }
2165
2166 static gboolean
gst_app_src_event(GstBaseSrc * src,GstEvent * event)2167 gst_app_src_event (GstBaseSrc * src, GstEvent * event)
2168 {
2169 GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
2170 GstAppSrcPrivate *priv = appsrc->priv;
2171
2172 switch (GST_EVENT_TYPE (event)) {
2173 case GST_EVENT_FLUSH_STOP:
2174 g_mutex_lock (&priv->mutex);
2175 priv->is_eos = FALSE;
2176 g_mutex_unlock (&priv->mutex);
2177 break;
2178 default:
2179 break;
2180 }
2181
2182 return GST_BASE_SRC_CLASS (parent_class)->event (src, event);
2183 }
2184