1 /* GStreamer
2  * Copyright (C) 2007 Nokia Corporation (contact <stefan.kost@nokia.com>)
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:element-rndbuffersize
21  *
22  * This element pulls buffers with random sizes from the source.
23  */
24 
25 #ifdef HAVE_CONFIG_H
26 #include "config.h"
27 #endif
28 
29 #include <gst/gst.h>
30 #include <gst/base/gstadapter.h>
31 
32 GST_DEBUG_CATEGORY_STATIC (gst_rnd_buffer_size_debug);
33 #define GST_CAT_DEFAULT gst_rnd_buffer_size_debug
34 
35 #define GST_TYPE_RND_BUFFER_SIZE            (gst_rnd_buffer_size_get_type())
36 #define GST_RND_BUFFER_SIZE(obj)            (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RND_BUFFER_SIZE,GstRndBufferSize))
37 #define GST_RND_BUFFER_SIZE_CLASS(klass)    (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RND_BUFFER_SIZE,GstRndBufferSizeClass))
38 #define GST_IS_RND_BUFFER_SIZE(obj)         (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RND_BUFFER_SIZE))
39 #define GST_IS_RND_BUFFER_SIZE_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RND_BUFFER_SIZE))
40 
41 typedef struct _GstRndBufferSize GstRndBufferSize;
42 typedef struct _GstRndBufferSizeClass GstRndBufferSizeClass;
43 
44 struct _GstRndBufferSize
45 {
46   GstElement parent;
47 
48   /*< private > */
49   GRand *rand;
50   guint seed;
51   gint min, max;
52 
53   GstPad *sinkpad, *srcpad;
54   guint64 offset;
55 
56   gboolean need_newsegment;
57 
58   GstAdapter *adapter;
59 };
60 
61 struct _GstRndBufferSizeClass
62 {
63   GstElementClass parent_class;
64 };
65 
66 enum
67 {
68   PROP_SEED = 1,
69   PROP_MINIMUM,
70   PROP_MAXIMUM
71 };
72 
73 #define DEFAULT_SEED 0
74 #define DEFAULT_MIN  1
75 #define DEFAULT_MAX  (8*1024)
76 
77 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
78     GST_PAD_SRC,
79     GST_PAD_ALWAYS,
80     GST_STATIC_CAPS_ANY);
81 
82 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
83     GST_PAD_SINK,
84     GST_PAD_ALWAYS,
85     GST_STATIC_CAPS_ANY);
86 
87 static void gst_rnd_buffer_size_finalize (GObject * object);
88 static void gst_rnd_buffer_size_set_property (GObject * object, guint prop_id,
89     const GValue * value, GParamSpec * pspec);
90 static void gst_rnd_buffer_size_get_property (GObject * object, guint prop_id,
91     GValue * value, GParamSpec * pspec);
92 
93 static gboolean gst_rnd_buffer_size_activate (GstPad * pad, GstObject * parent);
94 static gboolean gst_rnd_buffer_size_activate_mode (GstPad * pad,
95     GstObject * parent, GstPadMode mode, gboolean active);
96 static void gst_rnd_buffer_size_loop (GstRndBufferSize * self);
97 static GstStateChangeReturn gst_rnd_buffer_size_change_state (GstElement *
98     element, GstStateChange transition);
99 static gboolean gst_rnd_buffer_size_src_event (GstPad * pad,
100     GstObject * parent, GstEvent * event);
101 static gboolean gst_rnd_buffer_size_sink_event (GstPad * pad,
102     GstObject * parent, GstEvent * event);
103 static GstFlowReturn gst_rnd_buffer_size_chain (GstPad * pad,
104     GstObject * parent, GstBuffer * buffer);
105 
106 GType gst_rnd_buffer_size_get_type (void);
107 #define gst_rnd_buffer_size_parent_class parent_class
108 G_DEFINE_TYPE (GstRndBufferSize, gst_rnd_buffer_size, GST_TYPE_ELEMENT);
109 
110 static void
gst_rnd_buffer_size_class_init(GstRndBufferSizeClass * klass)111 gst_rnd_buffer_size_class_init (GstRndBufferSizeClass * klass)
112 {
113   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
114   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
115 
116   GST_DEBUG_CATEGORY_INIT (gst_rnd_buffer_size_debug, "rndbuffersize", 0,
117       "rndbuffersize element");
118 
119   gobject_class->set_property = gst_rnd_buffer_size_set_property;
120   gobject_class->get_property = gst_rnd_buffer_size_get_property;
121   gobject_class->finalize = gst_rnd_buffer_size_finalize;
122 
123   gst_element_class_add_static_pad_template (gstelement_class, &sink_template);
124   gst_element_class_add_static_pad_template (gstelement_class, &src_template);
125 
126   gst_element_class_set_static_metadata (gstelement_class, "Random buffer size",
127       "Testing", "pull random sized buffers",
128       "Stefan Kost <stefan.kost@nokia.com>");
129 
130   gstelement_class->change_state =
131       GST_DEBUG_FUNCPTR (gst_rnd_buffer_size_change_state);
132 
133   g_object_class_install_property (gobject_class, PROP_SEED,
134       g_param_spec_uint ("seed", "random number seed",
135           "seed for randomness (initialized when going from READY to PAUSED)",
136           0, G_MAXUINT32, DEFAULT_SEED,
137           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
138   g_object_class_install_property (gobject_class, PROP_MINIMUM,
139       g_param_spec_int ("min", "mininum", "mininum buffer size",
140           0, G_MAXINT32, DEFAULT_MIN,
141           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
142   g_object_class_install_property (gobject_class, PROP_MAXIMUM,
143       g_param_spec_int ("max", "maximum", "maximum buffer size",
144           1, G_MAXINT32, DEFAULT_MAX,
145           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
146 }
147 
148 static void
gst_rnd_buffer_size_init(GstRndBufferSize * self)149 gst_rnd_buffer_size_init (GstRndBufferSize * self)
150 {
151   self->sinkpad = gst_pad_new_from_static_template (&sink_template, "sink");
152   gst_pad_set_activate_function (self->sinkpad,
153       GST_DEBUG_FUNCPTR (gst_rnd_buffer_size_activate));
154   gst_pad_set_activatemode_function (self->sinkpad,
155       GST_DEBUG_FUNCPTR (gst_rnd_buffer_size_activate_mode));
156   gst_pad_set_event_function (self->sinkpad,
157       GST_DEBUG_FUNCPTR (gst_rnd_buffer_size_sink_event));
158   gst_pad_set_chain_function (self->sinkpad,
159       GST_DEBUG_FUNCPTR (gst_rnd_buffer_size_chain));
160   GST_OBJECT_FLAG_SET (self->sinkpad, GST_PAD_FLAG_PROXY_CAPS);
161   GST_OBJECT_FLAG_SET (self->sinkpad, GST_PAD_FLAG_PROXY_ALLOCATION);
162   GST_OBJECT_FLAG_SET (self->sinkpad, GST_PAD_FLAG_PROXY_SCHEDULING);
163   gst_element_add_pad (GST_ELEMENT (self), self->sinkpad);
164 
165   self->srcpad = gst_pad_new_from_static_template (&src_template, "src");
166   gst_pad_set_event_function (self->srcpad,
167       GST_DEBUG_FUNCPTR (gst_rnd_buffer_size_src_event));
168   GST_OBJECT_FLAG_SET (self->srcpad, GST_PAD_FLAG_PROXY_CAPS);
169   GST_OBJECT_FLAG_SET (self->srcpad, GST_PAD_FLAG_PROXY_ALLOCATION);
170   GST_OBJECT_FLAG_SET (self->srcpad, GST_PAD_FLAG_PROXY_SCHEDULING);
171   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
172 }
173 
174 
175 static void
gst_rnd_buffer_size_finalize(GObject * object)176 gst_rnd_buffer_size_finalize (GObject * object)
177 {
178   GstRndBufferSize *self = GST_RND_BUFFER_SIZE (object);
179 
180   if (self->rand) {
181     g_rand_free (self->rand);
182     self->rand = NULL;
183   }
184 
185   G_OBJECT_CLASS (parent_class)->finalize (object);
186 }
187 
188 
189 static void
gst_rnd_buffer_size_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)190 gst_rnd_buffer_size_set_property (GObject * object, guint prop_id,
191     const GValue * value, GParamSpec * pspec)
192 {
193   GstRndBufferSize *self = GST_RND_BUFFER_SIZE (object);
194 
195   switch (prop_id) {
196     case PROP_SEED:
197       self->seed = g_value_get_uint (value);
198       break;
199     case PROP_MINIMUM:
200       self->min = g_value_get_int (value);
201       break;
202     case PROP_MAXIMUM:
203       self->max = g_value_get_int (value);
204       break;
205     default:
206       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
207       break;
208   }
209 }
210 
211 
212 static void
gst_rnd_buffer_size_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)213 gst_rnd_buffer_size_get_property (GObject * object, guint prop_id,
214     GValue * value, GParamSpec * pspec)
215 {
216   GstRndBufferSize *self = GST_RND_BUFFER_SIZE (object);
217 
218   switch (prop_id) {
219     case PROP_SEED:
220       g_value_set_uint (value, self->seed);
221       break;
222     case PROP_MINIMUM:
223       g_value_set_int (value, self->min);
224       break;
225     case PROP_MAXIMUM:
226       g_value_set_int (value, self->max);
227       break;
228     default:
229       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
230       break;
231   }
232 }
233 
234 
235 static gboolean
gst_rnd_buffer_size_activate(GstPad * pad,GstObject * parent)236 gst_rnd_buffer_size_activate (GstPad * pad, GstObject * parent)
237 {
238   GstQuery *query;
239   gboolean pull_mode;
240 
241   query = gst_query_new_scheduling ();
242 
243   if (gst_pad_peer_query (pad, query))
244     pull_mode = gst_query_has_scheduling_mode_with_flags (query,
245         GST_PAD_MODE_PULL, GST_SCHEDULING_FLAG_SEEKABLE);
246   else
247     pull_mode = FALSE;
248 
249   gst_query_unref (query);
250 
251   if (pull_mode) {
252     GST_DEBUG_OBJECT (pad, "activating pull");
253     return gst_pad_activate_mode (pad, GST_PAD_MODE_PULL, TRUE);
254   } else {
255     GST_DEBUG_OBJECT (pad, "activating push");
256     return gst_pad_activate_mode (pad, GST_PAD_MODE_PUSH, TRUE);
257   }
258 }
259 
260 
261 static gboolean
gst_rnd_buffer_size_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)262 gst_rnd_buffer_size_activate_mode (GstPad * pad, GstObject * parent,
263     GstPadMode mode, gboolean active)
264 {
265   gboolean res;
266   GstRndBufferSize *self = GST_RND_BUFFER_SIZE (parent);
267 
268   switch (mode) {
269     case GST_PAD_MODE_PULL:
270       if (active) {
271         GST_INFO_OBJECT (self, "starting pull");
272         res =
273             gst_pad_start_task (pad, (GstTaskFunction) gst_rnd_buffer_size_loop,
274             self, NULL);
275         self->need_newsegment = TRUE;
276       } else {
277         GST_INFO_OBJECT (self, "stopping pull");
278         res = gst_pad_stop_task (pad);
279       }
280       break;
281     case GST_PAD_MODE_PUSH:
282       GST_INFO_OBJECT (self, "%sactivating in push mode", (active) ? "" : "de");
283       res = TRUE;
284       break;
285     default:
286       res = FALSE;
287       break;
288   }
289   return res;
290 }
291 
292 static gboolean
gst_rnd_buffer_size_src_event(GstPad * pad,GstObject * parent,GstEvent * event)293 gst_rnd_buffer_size_src_event (GstPad * pad, GstObject * parent,
294     GstEvent * event)
295 {
296   GstRndBufferSize *self;
297   GstSeekType start_type;
298   GstSeekFlags flags;
299   GstFormat format;
300   gint64 start;
301 
302   if (GST_EVENT_TYPE (event) != GST_EVENT_SEEK) {
303     return gst_pad_event_default (pad, parent, event);
304   }
305 
306   self = GST_RND_BUFFER_SIZE (parent);
307   gst_event_parse_seek (event, NULL, &format, &flags, &start_type, &start,
308       NULL, NULL);
309 
310   if (format != GST_FORMAT_BYTES) {
311     GST_WARNING_OBJECT (pad, "only BYTE format supported");
312     return FALSE;
313   }
314   if (start_type != GST_SEEK_TYPE_SET) {
315     GST_WARNING_OBJECT (pad, "only SEEK_TYPE_SET supported");
316     return FALSE;
317   }
318 
319   if ((flags & GST_SEEK_FLAG_FLUSH)) {
320     gst_pad_push_event (self->srcpad, gst_event_new_flush_start ());
321     gst_pad_push_event (self->sinkpad, gst_event_new_flush_start ());
322   } else {
323     gst_pad_pause_task (self->sinkpad);
324   }
325 
326   GST_PAD_STREAM_LOCK (self->sinkpad);
327 
328   if ((flags & GST_SEEK_FLAG_FLUSH)) {
329     gst_pad_push_event (self->srcpad, gst_event_new_flush_stop (TRUE));
330     gst_pad_push_event (self->sinkpad, gst_event_new_flush_stop (TRUE));
331   }
332 
333   GST_INFO_OBJECT (pad, "seeking to offset %" G_GINT64_FORMAT, start);
334 
335   self->offset = start;
336   self->need_newsegment = TRUE;
337 
338   gst_pad_start_task (self->sinkpad, (GstTaskFunction) gst_rnd_buffer_size_loop,
339       self, NULL);
340 
341   GST_PAD_STREAM_UNLOCK (self->sinkpad);
342   return TRUE;
343 }
344 
345 static GstFlowReturn
gst_rnd_buffer_size_drain_adapter(GstRndBufferSize * self,gboolean eos)346 gst_rnd_buffer_size_drain_adapter (GstRndBufferSize * self, gboolean eos)
347 {
348   GstFlowReturn flow;
349   GstBuffer *buf;
350   guint num_bytes, avail;
351 
352   flow = GST_FLOW_OK;
353 
354   if (G_UNLIKELY (self->min > self->max))
355     goto bogus_minmax;
356 
357   do {
358     if (self->min != self->max) {
359       num_bytes = g_rand_int_range (self->rand, self->min, self->max);
360     } else {
361       num_bytes = self->min;
362     }
363 
364     GST_LOG_OBJECT (self, "pulling %u bytes out of adapter", num_bytes);
365 
366     buf = gst_adapter_take_buffer (self->adapter, num_bytes);
367 
368     if (buf == NULL) {
369       if (!eos) {
370         GST_LOG_OBJECT (self, "not enough bytes in adapter");
371         break;
372       }
373 
374       avail = gst_adapter_available (self->adapter);
375 
376       if (avail == 0)
377         break;
378 
379       if (avail < self->min) {
380         GST_WARNING_OBJECT (self, "discarding %u bytes at end (min=%u)",
381             avail, self->min);
382         gst_adapter_clear (self->adapter);
383         break;
384       }
385       buf = gst_adapter_take_buffer (self->adapter, avail);
386       g_assert (buf != NULL);
387     }
388 
389     flow = gst_pad_push (self->srcpad, buf);
390   }
391   while (flow == GST_FLOW_OK);
392 
393   return flow;
394 
395 /* ERRORS */
396 bogus_minmax:
397   {
398     GST_ELEMENT_ERROR (self, LIBRARY, SETTINGS,
399         ("The minimum buffer size is smaller than the maximum buffer size."),
400         ("buffer sizes: max=%d, min=%d", self->min, self->max));
401     return GST_FLOW_ERROR;
402   }
403 }
404 
405 static gboolean
gst_rnd_buffer_size_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)406 gst_rnd_buffer_size_sink_event (GstPad * pad, GstObject * parent,
407     GstEvent * event)
408 {
409   GstRndBufferSize *rnd = GST_RND_BUFFER_SIZE (parent);
410 
411   switch (GST_EVENT_TYPE (event)) {
412     case GST_EVENT_EOS:
413       gst_rnd_buffer_size_drain_adapter (rnd, TRUE);
414       break;
415     case GST_EVENT_FLUSH_STOP:
416       if (rnd->adapter != NULL)
417         gst_adapter_clear (rnd->adapter);
418       break;
419     default:
420       break;
421   }
422 
423   return gst_pad_event_default (pad, parent, event);
424 }
425 
426 static GstFlowReturn
gst_rnd_buffer_size_chain(GstPad * pad,GstObject * parent,GstBuffer * buf)427 gst_rnd_buffer_size_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
428 {
429   GstRndBufferSize *rnd = GST_RND_BUFFER_SIZE (parent);
430   GstFlowReturn flow;
431 
432   if (rnd->adapter == NULL)
433     rnd->adapter = gst_adapter_new ();
434 
435   gst_adapter_push (rnd->adapter, buf);
436 
437   flow = gst_rnd_buffer_size_drain_adapter (rnd, FALSE);
438 
439   if (flow != GST_FLOW_OK)
440     GST_INFO_OBJECT (rnd, "flow: %s", gst_flow_get_name (flow));
441 
442   return flow;
443 }
444 
445 static void
gst_rnd_buffer_size_loop(GstRndBufferSize * self)446 gst_rnd_buffer_size_loop (GstRndBufferSize * self)
447 {
448   GstBuffer *buf = NULL;
449   GstFlowReturn ret;
450   guint num_bytes, size;
451 
452   if (G_UNLIKELY (self->min > self->max))
453     goto bogus_minmax;
454 
455   if (G_UNLIKELY (self->min != self->max)) {
456     num_bytes = g_rand_int_range (self->rand, self->min, self->max);
457   } else {
458     num_bytes = self->min;
459   }
460 
461   GST_LOG_OBJECT (self, "pulling %u bytes at offset %" G_GUINT64_FORMAT,
462       num_bytes, self->offset);
463 
464   ret = gst_pad_pull_range (self->sinkpad, self->offset, num_bytes, &buf);
465 
466   if (ret != GST_FLOW_OK)
467     goto pull_failed;
468 
469   size = gst_buffer_get_size (buf);
470 
471   if (size < num_bytes) {
472     GST_WARNING_OBJECT (self, "short buffer: %u bytes", size);
473   }
474 
475   if (self->need_newsegment) {
476     GstSegment segment;
477 
478     gst_segment_init (&segment, GST_FORMAT_BYTES);
479     segment.start = self->offset;
480     gst_pad_push_event (self->srcpad, gst_event_new_segment (&segment));
481     self->need_newsegment = FALSE;
482   }
483 
484   self->offset += size;
485 
486   ret = gst_pad_push (self->srcpad, buf);
487 
488   if (ret != GST_FLOW_OK)
489     goto push_failed;
490 
491   return;
492 
493 pause_task:
494   {
495     GST_DEBUG_OBJECT (self, "pausing task");
496     gst_pad_pause_task (self->sinkpad);
497     return;
498   }
499 
500 pull_failed:
501   {
502     if (ret == GST_FLOW_EOS) {
503       GST_DEBUG_OBJECT (self, "eos");
504       gst_pad_push_event (self->srcpad, gst_event_new_eos ());
505     } else {
506       GST_WARNING_OBJECT (self, "pull_range flow: %s", gst_flow_get_name (ret));
507     }
508     goto pause_task;
509   }
510 
511 push_failed:
512   {
513     GST_DEBUG_OBJECT (self, "push flow: %s", gst_flow_get_name (ret));
514     if (ret == GST_FLOW_EOS) {
515       GST_DEBUG_OBJECT (self, "eos");
516       gst_pad_push_event (self->srcpad, gst_event_new_eos ());
517     } else if (ret < GST_FLOW_EOS || ret == GST_FLOW_NOT_LINKED) {
518       GST_ELEMENT_FLOW_ERROR (self, ret);
519     }
520     goto pause_task;
521   }
522 
523 bogus_minmax:
524   {
525     GST_ELEMENT_ERROR (self, LIBRARY, SETTINGS,
526         ("The minimum buffer size is smaller than the maximum buffer size."),
527         ("buffer sizes: max=%d, min=%d", self->min, self->max));
528     goto pause_task;
529   }
530 }
531 
532 static GstStateChangeReturn
gst_rnd_buffer_size_change_state(GstElement * element,GstStateChange transition)533 gst_rnd_buffer_size_change_state (GstElement * element,
534     GstStateChange transition)
535 {
536   GstRndBufferSize *self = GST_RND_BUFFER_SIZE (element);
537   GstStateChangeReturn ret;
538 
539   switch (transition) {
540     case GST_STATE_CHANGE_NULL_TO_READY:
541       break;
542     case GST_STATE_CHANGE_READY_TO_PAUSED:
543       self->offset = 0;
544       if (!self->rand) {
545         self->rand = g_rand_new_with_seed (self->seed);
546       }
547       break;
548     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
549       break;
550     default:
551       break;
552   }
553 
554   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
555 
556   switch (transition) {
557     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
558       break;
559     case GST_STATE_CHANGE_PAUSED_TO_READY:
560       if (self->rand) {
561         g_rand_free (self->rand);
562         self->rand = NULL;
563       }
564       break;
565     case GST_STATE_CHANGE_READY_TO_NULL:
566       if (self->adapter) {
567         g_object_unref (self->adapter);
568         self->adapter = NULL;
569       }
570       break;
571     default:
572       break;
573   }
574 
575   return ret;
576 }
577