1 /* Aravis - Digital camera library
2  *
3  * Copyright © 2009-2010 Emmanuel Pacaud
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  *
20  * Author: Emmanuel Pacaud <emmanuel@gnome.org>
21  */
22 
23 /**
24  * SECTION: arvstream
25  * @short_description: Abstract base class for video stream reception
26  *
27  * #ArvStream provides an abstract base class for the implementation of video
28  * stream reception threads. The interface between the reception thread and the
29  * main thread is done using asynchronous queues, containing #ArvBuffer
30  * objects.
31  */
32 
33 #include <arvstreamprivate.h>
34 #include <arvbuffer.h>
35 #include <arvdebug.h>
36 
37 enum {
38 	ARV_STREAM_SIGNAL_NEW_BUFFER,
39 	ARV_STREAM_SIGNAL_LAST
40 } ArvStreamSignals;
41 
42 static guint arv_stream_signals[ARV_STREAM_SIGNAL_LAST] = {0};
43 
44 enum {
45 	ARV_STREAM_PROPERTY_0,
46 	ARV_STREAM_PROPERTY_EMIT_SIGNALS,
47 	ARV_STREAM_PROPERTY_LAST
48 } ArvStreamProperties;
49 
50 static GObjectClass *parent_class = NULL;
51 
52 struct _ArvStreamPrivate {
53 	GAsyncQueue *input_queue;
54 	GAsyncQueue *output_queue;
55 #if GLIB_CHECK_VERSION(2,32,0)
56 	GRecMutex mutex;
57 #else
58 	GRecMutex *mutex;
59 #endif
60 
61 	gboolean emit_signals;
62 };
63 
64 /**
65  * arv_stream_push_buffer:
66  * @stream: a #ArvStream
67  * @buffer: (transfer full): buffer to push
68  *
69  * Pushes a #ArvBuffer to the @stream thread. The @stream takes ownership of @buffer,
70  * and will free all the buffers still in its queues when destroyed.
71  *
72  * This method is thread safe.
73  *
74  * Since: 0.2.0
75  */
76 
77 void
arv_stream_push_buffer(ArvStream * stream,ArvBuffer * buffer)78 arv_stream_push_buffer (ArvStream *stream, ArvBuffer *buffer)
79 {
80 	g_return_if_fail (ARV_IS_STREAM (stream));
81 	g_return_if_fail (ARV_IS_BUFFER (buffer));
82 
83 	g_async_queue_push (stream->priv->input_queue, buffer);
84 }
85 
86 /**
87  * arv_stream_pop_buffer:
88  * @stream: a #ArvStream
89  *
90  * Pops a buffer from the output queue of @stream. The retrieved buffer
91  * may contain an invalid image. Caller should check the buffer status before using it.
92  * This function blocks until a buffer is available.
93  *
94  * This method is thread safe.
95  *
96  * Returns: (transfer full): a #ArvBuffer
97  *
98  * Since: 0.2.0
99  */
100 
101 ArvBuffer *
arv_stream_pop_buffer(ArvStream * stream)102 arv_stream_pop_buffer (ArvStream *stream)
103 {
104 	g_return_val_if_fail (ARV_IS_STREAM (stream), NULL);
105 
106 	return g_async_queue_pop (stream->priv->output_queue);
107 }
108 
109 /**
110  * arv_stream_try_pop_buffer:
111  * @stream: a #ArvStream
112  *
113  * Pops a buffer from the output queue of @stream. The retrieved buffer
114  * may contain an invalid image. Caller should check the buffer status before using it.
115  * This is the non blocking version of pop_buffer.
116  *
117  * This method is thread safe.
118  *
119  * Returns: (transfer full): a #ArvBuffer, NULL if no buffer is available.
120  *
121  * Since: 0.2.0
122  */
123 
124 ArvBuffer *
arv_stream_try_pop_buffer(ArvStream * stream)125 arv_stream_try_pop_buffer (ArvStream *stream)
126 {
127 	g_return_val_if_fail (ARV_IS_STREAM (stream), NULL);
128 
129 	return g_async_queue_try_pop (stream->priv->output_queue);
130 }
131 
132 /**
133  * arv_stream_timeout_pop_buffer:
134  * @stream: a #ArvStream
135  * @timeout: timeout, in µs
136  *
137  * Pops a buffer from the output queue of @stream, waiting no more than @timeout. The retrieved buffer
138  * may contain an invalid image. Caller should check the buffer status before using it.
139  *
140  * This method is thread safe.
141  *
142  * Returns: (transfer full): a #ArvBuffer, NULL if no buffer is available until the timeout occurs.
143  *
144  * Since: 0.2.0
145  */
146 
147 ArvBuffer *
arv_stream_timeout_pop_buffer(ArvStream * stream,guint64 timeout)148 arv_stream_timeout_pop_buffer (ArvStream *stream, guint64 timeout)
149 {
150 #if GLIB_CHECK_VERSION(2,32,0)
151 	g_return_val_if_fail (ARV_IS_STREAM (stream), NULL);
152 
153 	return g_async_queue_timeout_pop (stream->priv->output_queue, timeout);
154 #else
155 	GTimeVal end_time;
156 
157 	g_return_val_if_fail (ARV_IS_STREAM (stream), NULL);
158 
159 	g_get_current_time (&end_time);
160 	g_time_val_add (&end_time, timeout);
161 
162 	return g_async_queue_timed_pop (stream->priv->output_queue, &end_time);
163 #endif
164 }
165 
166 /**
167  * arv_stream_pop_input_buffer: (skip)
168  * @stream: (transfer full): a #ArvStream
169  *
170  * Pops a buffer from the input queue of @stream.
171  *
172  * Since: 0.2.0
173  */
174 
175 ArvBuffer *
arv_stream_pop_input_buffer(ArvStream * stream)176 arv_stream_pop_input_buffer (ArvStream *stream)
177 {
178 	g_return_val_if_fail (ARV_IS_STREAM (stream), NULL);
179 
180 	return g_async_queue_try_pop (stream->priv->input_queue);
181 }
182 
183 void
arv_stream_push_output_buffer(ArvStream * stream,ArvBuffer * buffer)184 arv_stream_push_output_buffer (ArvStream *stream, ArvBuffer *buffer)
185 {
186 	g_return_if_fail (ARV_IS_STREAM (stream));
187 	g_return_if_fail (ARV_IS_BUFFER (buffer));
188 
189 	g_async_queue_push (stream->priv->output_queue, buffer);
190 
191 #if GLIB_CHECK_VERSION(2,32,0)
192 	g_rec_mutex_lock (&stream->priv->mutex);
193 #else
194 	g_rec_mutex_lock (stream->priv->mutex);
195 #endif
196 
197 	if (stream->priv->emit_signals)
198 		g_signal_emit (stream, arv_stream_signals[ARV_STREAM_SIGNAL_NEW_BUFFER], 0);
199 
200 #if GLIB_CHECK_VERSION(2,32,0)
201 	g_rec_mutex_unlock (&stream->priv->mutex);
202 #else
203 	g_rec_mutex_unlock (stream->priv->mutex);
204 #endif
205 }
206 
207 /**
208  * arv_stream_get_n_buffers:
209  * @stream: a #ArvStream
210  * @n_input_buffers: (out) (allow-none): input queue length
211  * @n_output_buffers: (out) (allow-none): output queue length
212  *
213  * An accessor to the stream buffer queue lengths.
214  *
215  * Since: 0.2.0
216  */
217 
218 void
arv_stream_get_n_buffers(ArvStream * stream,gint * n_input_buffers,gint * n_output_buffers)219 arv_stream_get_n_buffers (ArvStream *stream, gint *n_input_buffers, gint *n_output_buffers)
220 {
221 	if (!ARV_IS_STREAM (stream)) {
222 		if (n_input_buffers != NULL)
223 			*n_input_buffers = 0;
224 		if (n_output_buffers != NULL)
225 			*n_output_buffers = 0;
226 		return;
227 	}
228 
229 	if (n_input_buffers != NULL)
230 		*n_input_buffers = g_async_queue_length (stream->priv->input_queue);
231 	if (n_output_buffers != NULL)
232 		*n_output_buffers = g_async_queue_length (stream->priv->output_queue);
233 }
234 
235 /**
236  * arv_stream_get_statistics:
237  * @stream: a #ArvStream
238  * @n_completed_buffers: (out) (allow-none): number of complete received buffers
239  * @n_failures: (out) (allow-none): number of reception failures
240  * @n_underruns: (out) (allow-none): number of input buffer underruns
241  *
242  * An accessor to the stream statistics.
243  *
244  * Since: 0.2.0
245  */
246 
247 void
arv_stream_get_statistics(ArvStream * stream,guint64 * n_completed_buffers,guint64 * n_failures,guint64 * n_underruns)248 arv_stream_get_statistics (ArvStream *stream,
249 			   guint64 *n_completed_buffers,
250 			   guint64 *n_failures,
251 			   guint64 *n_underruns)
252 {
253 	ArvStreamClass *stream_class;
254 	guint64 dummy;
255 
256 	if (n_completed_buffers == NULL)
257 		n_completed_buffers = &dummy;
258 	if (n_failures == NULL)
259 		n_failures = &dummy;
260 	if (n_underruns == NULL)
261 		n_underruns = &dummy;
262 
263 	*n_completed_buffers = 0;
264 	*n_failures = 0;
265 	*n_underruns = 0;
266 
267 	g_return_if_fail (ARV_IS_STREAM (stream));
268 
269 	stream_class = ARV_STREAM_GET_CLASS (stream);
270 	if (stream_class->get_statistics != NULL)
271 		stream_class->get_statistics (stream, n_completed_buffers, n_failures, n_underruns);
272 }
273 
274 /**
275  * arv_stream_set_emit_signals:
276  * @stream: a #ArvStream
277  * @emit_signals: the new state
278  *
279  * Make @stream emit signals. This option is
280  * by default disabled because signal emission is expensive and unneeded when
281  * the application prefers to operate in pull mode.
282  *
283  * Since: 0.2.0
284  */
285 
286 void
arv_stream_set_emit_signals(ArvStream * stream,gboolean emit_signals)287 arv_stream_set_emit_signals (ArvStream *stream, gboolean emit_signals)
288 {
289 	g_return_if_fail (ARV_IS_STREAM (stream));
290 
291 #if GLIB_CHECK_VERSION(2,32,0)
292 	g_rec_mutex_lock (&stream->priv->mutex);
293 #else
294 	g_rec_mutex_lock (stream->priv->mutex);
295 #endif
296 
297 	stream->priv->emit_signals = emit_signals;
298 
299 #if GLIB_CHECK_VERSION(2,32,0)
300 	g_rec_mutex_unlock (&stream->priv->mutex);
301 #else
302 	g_rec_mutex_unlock (stream->priv->mutex);
303 #endif
304 }
305 
306 /**
307  * arv_stream_get_emit_signals:
308  * @stream: a #ArvStream
309  *
310  * Check if stream will emit its signals.
311  *
312  * Returns: %TRUE if @stream is emiting its signals.
313  *
314  * Since: 0.2.0
315  */
316 
317 gboolean
arv_stream_get_emit_signals(ArvStream * stream)318 arv_stream_get_emit_signals (ArvStream *stream)
319 {
320 	gboolean ret;
321 	g_return_val_if_fail (ARV_IS_STREAM (stream), FALSE);
322 
323 #if GLIB_CHECK_VERSION(2,32,0)
324 	g_rec_mutex_lock (&stream->priv->mutex);
325 #else
326 	g_rec_mutex_lock (stream->priv->mutex);
327 #endif
328 
329 	ret = stream->priv->emit_signals;
330 
331 #if GLIB_CHECK_VERSION(2,32,0)
332 	g_rec_mutex_unlock (&stream->priv->mutex);
333 #else
334 	g_rec_mutex_unlock (stream->priv->mutex);
335 #endif
336 
337 	return ret;
338 }
339 
340 static void
arv_stream_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)341 arv_stream_set_property (GObject * object, guint prop_id,
342 			 const GValue * value, GParamSpec * pspec)
343 {
344 	ArvStream *stream = ARV_STREAM (object);
345 
346 	switch (prop_id) {
347 		case ARV_STREAM_PROPERTY_EMIT_SIGNALS:
348 			arv_stream_set_emit_signals (stream, g_value_get_boolean (value));
349 			break;
350 		default:
351 			G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
352 			break;
353 	}
354 }
355 
356 static void
arv_stream_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)357 arv_stream_get_property (GObject * object, guint prop_id,
358 			 GValue * value, GParamSpec * pspec)
359 {
360 	ArvStream *stream = ARV_STREAM (object);
361 
362 	switch (prop_id) {
363 		case ARV_STREAM_PROPERTY_EMIT_SIGNALS:
364 			g_value_set_boolean (value, arv_stream_get_emit_signals (stream));
365 			break;
366 		default:
367 			G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
368 			break;
369 	}
370 }
371 
372 static void
arv_stream_init(ArvStream * stream)373 arv_stream_init (ArvStream *stream)
374 {
375 	stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, ARV_TYPE_STREAM, ArvStreamPrivate);
376 
377 	stream->priv->input_queue = g_async_queue_new ();
378 	stream->priv->output_queue = g_async_queue_new ();
379 
380 	stream->priv->emit_signals = FALSE;
381 
382 #if GLIB_CHECK_VERSION(2,32,0)
383 	g_rec_mutex_init (&stream->priv->mutex);
384 #else
385 	stream->priv->mutex = g_rec_mutex_new ();
386 #endif
387 }
388 
389 static void
arv_stream_finalize(GObject * object)390 arv_stream_finalize (GObject *object)
391 {
392 	ArvStream *stream = ARV_STREAM (object);
393 	ArvBuffer *buffer;
394 
395 	arv_debug_stream ("[Stream::finalize] Flush %d buffer[s] in input queue",
396 			  g_async_queue_length (stream->priv->input_queue));
397 	arv_debug_stream ("[Stream::finalize] Flush %d buffer[s] in output queue",
398 			  g_async_queue_length (stream->priv->output_queue));
399 
400 	if (stream->priv->emit_signals) {
401 		g_warning ("Stream finalized with 'new-buffer' signal enabled");
402 		g_warning ("Please call arv_stream_set_emit_signals (stream, FALSE) before ArvStream object finalization");
403 	}
404 
405 	do {
406 		buffer = g_async_queue_try_pop (stream->priv->output_queue);
407 		if (buffer != NULL)
408 			g_object_unref (buffer);
409 	} while (buffer != NULL);
410 
411 	do {
412 		buffer = g_async_queue_try_pop (stream->priv->input_queue);
413 		if (buffer != NULL)
414 			g_object_unref (buffer);
415 	} while (buffer != NULL);
416 
417 	g_async_queue_unref (stream->priv->input_queue);
418 	g_async_queue_unref (stream->priv->output_queue);
419 
420 #if GLIB_CHECK_VERSION(2,32,0)
421 	g_rec_mutex_clear (&stream->priv->mutex);
422 #else
423 	g_rec_mutex_free (stream->priv->mutex);
424 #endif
425 
426 	parent_class->finalize (object);
427 }
428 
429 static void
arv_stream_class_init(ArvStreamClass * node_class)430 arv_stream_class_init (ArvStreamClass *node_class)
431 {
432 	GObjectClass *object_class = G_OBJECT_CLASS (node_class);
433 
434 	g_type_class_add_private (node_class, sizeof (ArvStreamPrivate));
435 
436 	parent_class = g_type_class_peek_parent (node_class);
437 
438 	object_class->finalize = arv_stream_finalize;
439 	object_class->set_property = arv_stream_set_property;
440 	object_class->get_property = arv_stream_get_property;
441 
442 	/**
443 	 * ArvStream::new-buffer:
444 	 * @stream: the stream that emited the signal
445 	 *
446 	 * Signal that a new buffer is available.
447 	 *
448 	 * This signal is emited from the stream receive thread and only when the
449 	 * "emit-signals" property is %TRUE.
450 	 *
451 	 * The new buffer can be retrieved with arv_stream_pop_buffer().
452 	 *
453 	 * Note that this signal is only emited when the "emit-signals" property is
454 	 * set to %TRUE, which it is not by default for performance reasons.
455 	 *
456 	 * Since: 0.2.0
457 	 */
458 
459 	arv_stream_signals[ARV_STREAM_SIGNAL_NEW_BUFFER] =
460 		g_signal_new ("new-buffer",
461 			      G_TYPE_FROM_CLASS (node_class),
462 			      G_SIGNAL_RUN_LAST,
463 			      G_STRUCT_OFFSET (ArvStreamClass, new_buffer),
464 			      NULL, NULL,
465 			      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
466 
467 	g_object_class_install_property (
468 		object_class, ARV_STREAM_PROPERTY_EMIT_SIGNALS,
469 		g_param_spec_boolean ("emit-signals", "Emit signals",
470 				      "Emit signals", FALSE,
471 				      G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)
472 		);
473 }
474 
475 G_DEFINE_ABSTRACT_TYPE (ArvStream, arv_stream, G_TYPE_OBJECT)
476 
477