1 /* GStreamer
2 * Copyright (C) <2009> Collabora Ltd
3 * @author: Olivier Crete <olivier.crete@collabora.co.uk
4 * Copyright (C) <2009> Nokia Inc
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
20 */
21 /**
22 * SECTION:element-shmsrc
23 * @title: shmsrc
24 *
25 * Receive data from the shared memory sink.
26 *
27 * ## Example launch lines
28 * |[
29 * gst-launch-1.0 shmsrc socket-path=/tmp/blah ! \
30 * "video/x-raw, format=YUY2, color-matrix=sdtv, \
31 * chroma-site=mpeg2, width=(int)320, height=(int)240, framerate=(fraction)30/1" \
32 * ! queue ! videoconvert ! autovideosink
33 * ]| Render video from shm buffers.
34 *
35 */
36
37 #ifdef HAVE_CONFIG_H
38 #include "config.h"
39 #endif
40
41 #include "gstshmsrc.h"
42
43 #include <gst/gst.h>
44
45 #include <string.h>
46
47 /* signals */
48 enum
49 {
50 LAST_SIGNAL
51 };
52
53 /* properties */
54 enum
55 {
56 PROP_0,
57 PROP_SOCKET_PATH,
58 PROP_IS_LIVE,
59 PROP_SHM_AREA_NAME
60 };
61
62 struct GstShmBuffer
63 {
64 char *buf;
65 GstShmPipe *pipe;
66 };
67
68
69 GST_DEBUG_CATEGORY_STATIC (shmsrc_debug);
70 #define GST_CAT_DEFAULT shmsrc_debug
71
72 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
73 GST_PAD_SRC,
74 GST_PAD_ALWAYS,
75 GST_STATIC_CAPS_ANY);
76
77 #define gst_shm_src_parent_class parent_class
78 G_DEFINE_TYPE (GstShmSrc, gst_shm_src, GST_TYPE_PUSH_SRC);
79
80 static void gst_shm_src_set_property (GObject * object, guint prop_id,
81 const GValue * value, GParamSpec * pspec);
82 static void gst_shm_src_get_property (GObject * object, guint prop_id,
83 GValue * value, GParamSpec * pspec);
84 static void gst_shm_src_finalize (GObject * object);
85 static gboolean gst_shm_src_start (GstBaseSrc * bsrc);
86 static gboolean gst_shm_src_stop (GstBaseSrc * bsrc);
87 static GstFlowReturn gst_shm_src_create (GstPushSrc * psrc,
88 GstBuffer ** outbuf);
89 static gboolean gst_shm_src_unlock (GstBaseSrc * bsrc);
90 static gboolean gst_shm_src_unlock_stop (GstBaseSrc * bsrc);
91 static GstStateChangeReturn gst_shm_src_change_state (GstElement * element,
92 GstStateChange transition);
93
94 static void gst_shm_pipe_dec (GstShmPipe * pipe);
95
96 static void
gst_shm_src_class_init(GstShmSrcClass * klass)97 gst_shm_src_class_init (GstShmSrcClass * klass)
98 {
99 GObjectClass *gobject_class;
100 GstElementClass *gstelement_class;
101 GstBaseSrcClass *gstbasesrc_class;
102 GstPushSrcClass *gstpush_src_class;
103
104 gobject_class = (GObjectClass *) klass;
105 gstelement_class = (GstElementClass *) klass;
106 gstbasesrc_class = (GstBaseSrcClass *) klass;
107 gstpush_src_class = (GstPushSrcClass *) klass;
108
109 gobject_class->set_property = gst_shm_src_set_property;
110 gobject_class->get_property = gst_shm_src_get_property;
111 gobject_class->finalize = gst_shm_src_finalize;
112
113 gstelement_class->change_state = gst_shm_src_change_state;
114
115 gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_shm_src_start);
116 gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_shm_src_stop);
117 gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_shm_src_unlock);
118 gstbasesrc_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_shm_src_unlock_stop);
119
120 gstpush_src_class->create = gst_shm_src_create;
121
122 g_object_class_install_property (gobject_class, PROP_SOCKET_PATH,
123 g_param_spec_string ("socket-path",
124 "Path to the control socket",
125 "The path to the control socket used to control the shared memory",
126 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
127
128 g_object_class_install_property (gobject_class, PROP_IS_LIVE,
129 g_param_spec_boolean ("is-live", "Is this a live source",
130 "True if the element cannot produce data in PAUSED", FALSE,
131 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
132
133 g_object_class_install_property (gobject_class, PROP_SHM_AREA_NAME,
134 g_param_spec_string ("shm-area-name",
135 "Name of the shared memory area",
136 "The name of the shared memory area used to get buffers",
137 NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
138
139 gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
140
141 gst_element_class_set_static_metadata (gstelement_class,
142 "Shared Memory Source",
143 "Source",
144 "Receive data from the shared memory sink",
145 "Olivier Crete <olivier.crete@collabora.co.uk>");
146
147 GST_DEBUG_CATEGORY_INIT (shmsrc_debug, "shmsrc", 0, "Shared Memory Source");
148 }
149
150 static void
gst_shm_src_init(GstShmSrc * self)151 gst_shm_src_init (GstShmSrc * self)
152 {
153 self->poll = gst_poll_new (TRUE);
154 gst_poll_fd_init (&self->pollfd);
155 }
156
157 static void
gst_shm_src_finalize(GObject * object)158 gst_shm_src_finalize (GObject * object)
159 {
160 GstShmSrc *self = GST_SHM_SRC (object);
161
162 gst_poll_free (self->poll);
163 g_free (self->socket_path);
164
165 G_OBJECT_CLASS (parent_class)->finalize (object);
166 }
167
168
169 static void
gst_shm_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)170 gst_shm_src_set_property (GObject * object, guint prop_id,
171 const GValue * value, GParamSpec * pspec)
172 {
173 GstShmSrc *self = GST_SHM_SRC (object);
174
175 switch (prop_id) {
176 case PROP_SOCKET_PATH:
177 GST_OBJECT_LOCK (object);
178 if (self->pipe) {
179 GST_WARNING_OBJECT (object, "Can not modify socket path while the "
180 "element is playing");
181 } else {
182 g_free (self->socket_path);
183 self->socket_path = g_value_dup_string (value);
184 }
185 GST_OBJECT_UNLOCK (object);
186 break;
187 case PROP_IS_LIVE:
188 gst_base_src_set_live (GST_BASE_SRC (object),
189 g_value_get_boolean (value));
190 break;
191 default:
192 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
193 break;
194 }
195 }
196
197 static void
gst_shm_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)198 gst_shm_src_get_property (GObject * object, guint prop_id,
199 GValue * value, GParamSpec * pspec)
200 {
201 GstShmSrc *self = GST_SHM_SRC (object);
202
203 switch (prop_id) {
204 case PROP_SOCKET_PATH:
205 GST_OBJECT_LOCK (object);
206 g_value_set_string (value, self->socket_path);
207 GST_OBJECT_UNLOCK (object);
208 break;
209 case PROP_IS_LIVE:
210 g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (object)));
211 break;
212 case PROP_SHM_AREA_NAME:
213 GST_OBJECT_LOCK (object);
214 if (self->pipe)
215 g_value_set_string (value, sp_get_shm_area_name (self->pipe->pipe));
216 GST_OBJECT_UNLOCK (object);
217 break;
218 default:
219 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
220 break;
221 }
222 }
223
224 static gboolean
gst_shm_src_start_reading(GstShmSrc * self)225 gst_shm_src_start_reading (GstShmSrc * self)
226 {
227 GstShmPipe *gstpipe;
228
229 if (!self->socket_path) {
230 GST_ELEMENT_ERROR (self, RESOURCE, NOT_FOUND,
231 ("No path specified for socket."), (NULL));
232 return FALSE;
233 }
234
235 gstpipe = g_slice_new0 (GstShmPipe);
236 gstpipe->use_count = 1;
237 gstpipe->src = gst_object_ref (self);
238
239 GST_DEBUG_OBJECT (self, "Opening socket %s", self->socket_path);
240
241 GST_OBJECT_LOCK (self);
242 gstpipe->pipe = sp_client_open (self->socket_path);
243 GST_OBJECT_UNLOCK (self);
244
245 if (!gstpipe->pipe) {
246 GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE,
247 ("Could not open socket %s: %d %s", self->socket_path, errno,
248 strerror (errno)), (NULL));
249 gst_shm_pipe_dec (gstpipe);
250 return FALSE;
251 }
252
253 self->pipe = gstpipe;
254
255 self->unlocked = FALSE;
256 gst_poll_set_flushing (self->poll, FALSE);
257
258 gst_poll_fd_init (&self->pollfd);
259 self->pollfd.fd = sp_get_fd (self->pipe->pipe);
260 gst_poll_add_fd (self->poll, &self->pollfd);
261 gst_poll_fd_ctl_read (self->poll, &self->pollfd, TRUE);
262
263 return TRUE;
264 }
265
266 static void
gst_shm_src_stop_reading(GstShmSrc * self)267 gst_shm_src_stop_reading (GstShmSrc * self)
268 {
269 GstShmPipe *pipe;
270
271 GST_DEBUG_OBJECT (self, "Stopping %p", self);
272
273 GST_OBJECT_LOCK (self);
274 pipe = self->pipe;
275 self->pipe = NULL;
276 GST_OBJECT_UNLOCK (self);
277
278 if (pipe) {
279 gst_shm_pipe_dec (pipe);
280 }
281 gst_poll_set_flushing (self->poll, TRUE);
282 }
283
284 static gboolean
gst_shm_src_start(GstBaseSrc * bsrc)285 gst_shm_src_start (GstBaseSrc * bsrc)
286 {
287 if (gst_base_src_is_live (bsrc))
288 return TRUE;
289 else
290 return gst_shm_src_start_reading (GST_SHM_SRC (bsrc));
291 }
292
293 static gboolean
gst_shm_src_stop(GstBaseSrc * bsrc)294 gst_shm_src_stop (GstBaseSrc * bsrc)
295 {
296 if (!gst_base_src_is_live (bsrc))
297 gst_shm_src_stop_reading (GST_SHM_SRC (bsrc));
298
299 return TRUE;
300 }
301
302
303 static void
free_buffer(gpointer data)304 free_buffer (gpointer data)
305 {
306 struct GstShmBuffer *gsb = data;
307 g_return_if_fail (gsb->pipe != NULL);
308 g_return_if_fail (gsb->pipe->src != NULL);
309
310 GST_LOG ("Freeing buffer %p", gsb->buf);
311
312 GST_OBJECT_LOCK (gsb->pipe->src);
313 sp_client_recv_finish (gsb->pipe->pipe, gsb->buf);
314 GST_OBJECT_UNLOCK (gsb->pipe->src);
315
316 gst_shm_pipe_dec (gsb->pipe);
317
318 g_slice_free (struct GstShmBuffer, gsb);
319 }
320
321 static GstFlowReturn
gst_shm_src_create(GstPushSrc * psrc,GstBuffer ** outbuf)322 gst_shm_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
323 {
324 GstShmSrc *self = GST_SHM_SRC (psrc);
325 GstShmPipe *pipe;
326 gchar *buf = NULL;
327 int rv = 0;
328 struct GstShmBuffer *gsb;
329
330 GST_DEBUG_OBJECT (self, "Stopping %p", self);
331
332 GST_OBJECT_LOCK (self);
333 pipe = self->pipe;
334 if (!pipe) {
335 GST_OBJECT_UNLOCK (self);
336 return GST_FLOW_FLUSHING;
337 } else {
338 pipe->use_count++;
339 }
340 GST_OBJECT_UNLOCK (self);
341
342 do {
343 if (gst_poll_wait (self->poll, GST_CLOCK_TIME_NONE) < 0) {
344 if (errno == EBUSY)
345 goto flushing;
346 GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"),
347 ("Poll failed on fd: %s", strerror (errno)));
348 goto error;
349 }
350
351 if (self->unlocked)
352 goto flushing;
353
354 if (gst_poll_fd_has_closed (self->poll, &self->pollfd)) {
355 GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"),
356 ("Control socket has closed"));
357 goto error;
358 }
359
360 if (gst_poll_fd_has_error (self->poll, &self->pollfd)) {
361 GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"),
362 ("Control socket has error"));
363 goto error;
364 }
365
366 if (gst_poll_fd_can_read (self->poll, &self->pollfd)) {
367 buf = NULL;
368 GST_LOG_OBJECT (self, "Reading from pipe");
369 GST_OBJECT_LOCK (self);
370 rv = sp_client_recv (pipe->pipe, &buf);
371 GST_OBJECT_UNLOCK (self);
372 if (rv < 0) {
373 GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"),
374 ("Error reading control data: %d", rv));
375 goto error;
376 }
377 }
378 } while (buf == NULL);
379
380 GST_LOG_OBJECT (self, "Got buffer %p of size %d", buf, rv);
381
382 gsb = g_slice_new0 (struct GstShmBuffer);
383 gsb->buf = buf;
384 gsb->pipe = pipe;
385
386 *outbuf = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY,
387 buf, rv, 0, rv, gsb, free_buffer);
388
389 return GST_FLOW_OK;
390
391 error:
392 gst_shm_pipe_dec (pipe);
393 return GST_FLOW_ERROR;
394 flushing:
395 gst_shm_pipe_dec (pipe);
396 return GST_FLOW_FLUSHING;
397 }
398
399 static GstStateChangeReturn
gst_shm_src_change_state(GstElement * element,GstStateChange transition)400 gst_shm_src_change_state (GstElement * element, GstStateChange transition)
401 {
402 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
403 GstShmSrc *self = GST_SHM_SRC (element);
404
405 switch (transition) {
406 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
407 if (gst_base_src_is_live (GST_BASE_SRC (element))) {
408 if (!gst_shm_src_start_reading (self))
409 return GST_STATE_CHANGE_FAILURE;
410 }
411 default:
412 break;
413 }
414
415 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
416 if (ret == GST_STATE_CHANGE_FAILURE)
417 return ret;
418
419 switch (transition) {
420 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
421 if (gst_base_src_is_live (GST_BASE_SRC (element))) {
422 gst_shm_src_unlock (GST_BASE_SRC (element));
423 gst_shm_src_stop_reading (self);
424 }
425 default:
426 break;
427 }
428
429 return ret;
430 }
431
432 static gboolean
gst_shm_src_unlock(GstBaseSrc * bsrc)433 gst_shm_src_unlock (GstBaseSrc * bsrc)
434 {
435 GstShmSrc *self = GST_SHM_SRC (bsrc);
436
437 self->unlocked = TRUE;
438 gst_poll_set_flushing (self->poll, TRUE);
439
440 return TRUE;
441 }
442
443 static gboolean
gst_shm_src_unlock_stop(GstBaseSrc * bsrc)444 gst_shm_src_unlock_stop (GstBaseSrc * bsrc)
445 {
446 GstShmSrc *self = GST_SHM_SRC (bsrc);
447
448 self->unlocked = FALSE;
449 gst_poll_set_flushing (self->poll, FALSE);
450
451 return TRUE;
452 }
453
454 static void
gst_shm_pipe_dec(GstShmPipe * pipe)455 gst_shm_pipe_dec (GstShmPipe * pipe)
456 {
457 g_return_if_fail (pipe);
458 g_return_if_fail (pipe->src);
459 g_return_if_fail (pipe->use_count > 0);
460
461 GST_OBJECT_LOCK (pipe->src);
462 pipe->use_count--;
463
464 if (pipe->use_count > 0) {
465 GST_OBJECT_UNLOCK (pipe->src);
466 return;
467 }
468
469 if (pipe->pipe)
470 sp_client_close (pipe->pipe);
471
472 gst_poll_remove_fd (pipe->src->poll, &pipe->src->pollfd);
473 gst_poll_fd_init (&pipe->src->pollfd);
474
475 GST_OBJECT_UNLOCK (pipe->src);
476
477 gst_object_unref (pipe->src);
478 g_slice_free (GstShmPipe, pipe);
479 }
480