1 /* GStreamer
2  * Copyright (C) <2009> Collabora Ltd
3  *  @author: Olivier Crete <olivier.crete@collabora.co.uk
4  * Copyright (C) <2009> Nokia Inc
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:element-shmsrc
23  * @title: shmsrc
24  *
25  * Receive data from the shared memory sink.
26  *
27  * ## Example launch lines
28  * |[
29  * gst-launch-1.0 shmsrc socket-path=/tmp/blah ! \
30  * "video/x-raw, format=YUY2, color-matrix=sdtv, \
31  * chroma-site=mpeg2, width=(int)320, height=(int)240, framerate=(fraction)30/1" \
32  * ! queue ! videoconvert ! autovideosink
33  * ]| Render video from shm buffers.
34  *
35  */
36 
37 #ifdef HAVE_CONFIG_H
38 #include "config.h"
39 #endif
40 
41 #include "gstshmsrc.h"
42 
43 #include <gst/gst.h>
44 
45 #include <string.h>
46 
47 /* signals */
48 enum
49 {
50   LAST_SIGNAL
51 };
52 
53 /* properties */
54 enum
55 {
56   PROP_0,
57   PROP_SOCKET_PATH,
58   PROP_IS_LIVE,
59   PROP_SHM_AREA_NAME
60 };
61 
62 struct GstShmBuffer
63 {
64   char *buf;
65   GstShmPipe *pipe;
66 };
67 
68 
69 GST_DEBUG_CATEGORY_STATIC (shmsrc_debug);
70 #define GST_CAT_DEFAULT shmsrc_debug
71 
72 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
73     GST_PAD_SRC,
74     GST_PAD_ALWAYS,
75     GST_STATIC_CAPS_ANY);
76 
77 #define gst_shm_src_parent_class parent_class
78 G_DEFINE_TYPE (GstShmSrc, gst_shm_src, GST_TYPE_PUSH_SRC);
79 
80 static void gst_shm_src_set_property (GObject * object, guint prop_id,
81     const GValue * value, GParamSpec * pspec);
82 static void gst_shm_src_get_property (GObject * object, guint prop_id,
83     GValue * value, GParamSpec * pspec);
84 static void gst_shm_src_finalize (GObject * object);
85 static gboolean gst_shm_src_start (GstBaseSrc * bsrc);
86 static gboolean gst_shm_src_stop (GstBaseSrc * bsrc);
87 static GstFlowReturn gst_shm_src_create (GstPushSrc * psrc,
88     GstBuffer ** outbuf);
89 static gboolean gst_shm_src_unlock (GstBaseSrc * bsrc);
90 static gboolean gst_shm_src_unlock_stop (GstBaseSrc * bsrc);
91 static GstStateChangeReturn gst_shm_src_change_state (GstElement * element,
92     GstStateChange transition);
93 
94 static void gst_shm_pipe_dec (GstShmPipe * pipe);
95 
96 static void
gst_shm_src_class_init(GstShmSrcClass * klass)97 gst_shm_src_class_init (GstShmSrcClass * klass)
98 {
99   GObjectClass *gobject_class;
100   GstElementClass *gstelement_class;
101   GstBaseSrcClass *gstbasesrc_class;
102   GstPushSrcClass *gstpush_src_class;
103 
104   gobject_class = (GObjectClass *) klass;
105   gstelement_class = (GstElementClass *) klass;
106   gstbasesrc_class = (GstBaseSrcClass *) klass;
107   gstpush_src_class = (GstPushSrcClass *) klass;
108 
109   gobject_class->set_property = gst_shm_src_set_property;
110   gobject_class->get_property = gst_shm_src_get_property;
111   gobject_class->finalize = gst_shm_src_finalize;
112 
113   gstelement_class->change_state = gst_shm_src_change_state;
114 
115   gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_shm_src_start);
116   gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_shm_src_stop);
117   gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_shm_src_unlock);
118   gstbasesrc_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_shm_src_unlock_stop);
119 
120   gstpush_src_class->create = gst_shm_src_create;
121 
122   g_object_class_install_property (gobject_class, PROP_SOCKET_PATH,
123       g_param_spec_string ("socket-path",
124           "Path to the control socket",
125           "The path to the control socket used to control the shared memory",
126           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
127 
128   g_object_class_install_property (gobject_class, PROP_IS_LIVE,
129       g_param_spec_boolean ("is-live", "Is this a live source",
130           "True if the element cannot produce data in PAUSED", FALSE,
131           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
132 
133   g_object_class_install_property (gobject_class, PROP_SHM_AREA_NAME,
134       g_param_spec_string ("shm-area-name",
135           "Name of the shared memory area",
136           "The name of the shared memory area used to get buffers",
137           NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
138 
139   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
140 
141   gst_element_class_set_static_metadata (gstelement_class,
142       "Shared Memory Source",
143       "Source",
144       "Receive data from the shared memory sink",
145       "Olivier Crete <olivier.crete@collabora.co.uk>");
146 
147   GST_DEBUG_CATEGORY_INIT (shmsrc_debug, "shmsrc", 0, "Shared Memory Source");
148 }
149 
150 static void
gst_shm_src_init(GstShmSrc * self)151 gst_shm_src_init (GstShmSrc * self)
152 {
153   self->poll = gst_poll_new (TRUE);
154   gst_poll_fd_init (&self->pollfd);
155 }
156 
157 static void
gst_shm_src_finalize(GObject * object)158 gst_shm_src_finalize (GObject * object)
159 {
160   GstShmSrc *self = GST_SHM_SRC (object);
161 
162   gst_poll_free (self->poll);
163   g_free (self->socket_path);
164 
165   G_OBJECT_CLASS (parent_class)->finalize (object);
166 }
167 
168 
169 static void
gst_shm_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)170 gst_shm_src_set_property (GObject * object, guint prop_id,
171     const GValue * value, GParamSpec * pspec)
172 {
173   GstShmSrc *self = GST_SHM_SRC (object);
174 
175   switch (prop_id) {
176     case PROP_SOCKET_PATH:
177       GST_OBJECT_LOCK (object);
178       if (self->pipe) {
179         GST_WARNING_OBJECT (object, "Can not modify socket path while the "
180             "element is playing");
181       } else {
182         g_free (self->socket_path);
183         self->socket_path = g_value_dup_string (value);
184       }
185       GST_OBJECT_UNLOCK (object);
186       break;
187     case PROP_IS_LIVE:
188       gst_base_src_set_live (GST_BASE_SRC (object),
189           g_value_get_boolean (value));
190       break;
191     default:
192       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
193       break;
194   }
195 }
196 
197 static void
gst_shm_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)198 gst_shm_src_get_property (GObject * object, guint prop_id,
199     GValue * value, GParamSpec * pspec)
200 {
201   GstShmSrc *self = GST_SHM_SRC (object);
202 
203   switch (prop_id) {
204     case PROP_SOCKET_PATH:
205       GST_OBJECT_LOCK (object);
206       g_value_set_string (value, self->socket_path);
207       GST_OBJECT_UNLOCK (object);
208       break;
209     case PROP_IS_LIVE:
210       g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (object)));
211       break;
212     case PROP_SHM_AREA_NAME:
213       GST_OBJECT_LOCK (object);
214       if (self->pipe)
215         g_value_set_string (value, sp_get_shm_area_name (self->pipe->pipe));
216       GST_OBJECT_UNLOCK (object);
217       break;
218     default:
219       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
220       break;
221   }
222 }
223 
224 static gboolean
gst_shm_src_start_reading(GstShmSrc * self)225 gst_shm_src_start_reading (GstShmSrc * self)
226 {
227   GstShmPipe *gstpipe;
228 
229   if (!self->socket_path) {
230     GST_ELEMENT_ERROR (self, RESOURCE, NOT_FOUND,
231         ("No path specified for socket."), (NULL));
232     return FALSE;
233   }
234 
235   gstpipe = g_slice_new0 (GstShmPipe);
236   gstpipe->use_count = 1;
237   gstpipe->src = gst_object_ref (self);
238 
239   GST_DEBUG_OBJECT (self, "Opening socket %s", self->socket_path);
240 
241   GST_OBJECT_LOCK (self);
242   gstpipe->pipe = sp_client_open (self->socket_path);
243   GST_OBJECT_UNLOCK (self);
244 
245   if (!gstpipe->pipe) {
246     GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE,
247         ("Could not open socket %s: %d %s", self->socket_path, errno,
248             strerror (errno)), (NULL));
249     gst_shm_pipe_dec (gstpipe);
250     return FALSE;
251   }
252 
253   self->pipe = gstpipe;
254 
255   self->unlocked = FALSE;
256   gst_poll_set_flushing (self->poll, FALSE);
257 
258   gst_poll_fd_init (&self->pollfd);
259   self->pollfd.fd = sp_get_fd (self->pipe->pipe);
260   gst_poll_add_fd (self->poll, &self->pollfd);
261   gst_poll_fd_ctl_read (self->poll, &self->pollfd, TRUE);
262 
263   return TRUE;
264 }
265 
266 static void
gst_shm_src_stop_reading(GstShmSrc * self)267 gst_shm_src_stop_reading (GstShmSrc * self)
268 {
269   GstShmPipe *pipe;
270 
271   GST_DEBUG_OBJECT (self, "Stopping %p", self);
272 
273   GST_OBJECT_LOCK (self);
274   pipe = self->pipe;
275   self->pipe = NULL;
276   GST_OBJECT_UNLOCK (self);
277 
278   if (pipe) {
279     gst_shm_pipe_dec (pipe);
280   }
281   gst_poll_set_flushing (self->poll, TRUE);
282 }
283 
284 static gboolean
gst_shm_src_start(GstBaseSrc * bsrc)285 gst_shm_src_start (GstBaseSrc * bsrc)
286 {
287   if (gst_base_src_is_live (bsrc))
288     return TRUE;
289   else
290     return gst_shm_src_start_reading (GST_SHM_SRC (bsrc));
291 }
292 
293 static gboolean
gst_shm_src_stop(GstBaseSrc * bsrc)294 gst_shm_src_stop (GstBaseSrc * bsrc)
295 {
296   if (!gst_base_src_is_live (bsrc))
297     gst_shm_src_stop_reading (GST_SHM_SRC (bsrc));
298 
299   return TRUE;
300 }
301 
302 
303 static void
free_buffer(gpointer data)304 free_buffer (gpointer data)
305 {
306   struct GstShmBuffer *gsb = data;
307   g_return_if_fail (gsb->pipe != NULL);
308   g_return_if_fail (gsb->pipe->src != NULL);
309 
310   GST_LOG ("Freeing buffer %p", gsb->buf);
311 
312   GST_OBJECT_LOCK (gsb->pipe->src);
313   sp_client_recv_finish (gsb->pipe->pipe, gsb->buf);
314   GST_OBJECT_UNLOCK (gsb->pipe->src);
315 
316   gst_shm_pipe_dec (gsb->pipe);
317 
318   g_slice_free (struct GstShmBuffer, gsb);
319 }
320 
321 static GstFlowReturn
gst_shm_src_create(GstPushSrc * psrc,GstBuffer ** outbuf)322 gst_shm_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
323 {
324   GstShmSrc *self = GST_SHM_SRC (psrc);
325   GstShmPipe *pipe;
326   gchar *buf = NULL;
327   int rv = 0;
328   struct GstShmBuffer *gsb;
329 
330   GST_DEBUG_OBJECT (self, "Stopping %p", self);
331 
332   GST_OBJECT_LOCK (self);
333   pipe = self->pipe;
334   if (!pipe) {
335     GST_OBJECT_UNLOCK (self);
336     return GST_FLOW_FLUSHING;
337   } else {
338     pipe->use_count++;
339   }
340   GST_OBJECT_UNLOCK (self);
341 
342   do {
343     if (gst_poll_wait (self->poll, GST_CLOCK_TIME_NONE) < 0) {
344       if (errno == EBUSY)
345         goto flushing;
346       GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"),
347           ("Poll failed on fd: %s", strerror (errno)));
348       goto error;
349     }
350 
351     if (self->unlocked)
352       goto flushing;
353 
354     if (gst_poll_fd_has_closed (self->poll, &self->pollfd)) {
355       GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"),
356           ("Control socket has closed"));
357       goto error;
358     }
359 
360     if (gst_poll_fd_has_error (self->poll, &self->pollfd)) {
361       GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"),
362           ("Control socket has error"));
363       goto error;
364     }
365 
366     if (gst_poll_fd_can_read (self->poll, &self->pollfd)) {
367       buf = NULL;
368       GST_LOG_OBJECT (self, "Reading from pipe");
369       GST_OBJECT_LOCK (self);
370       rv = sp_client_recv (pipe->pipe, &buf);
371       GST_OBJECT_UNLOCK (self);
372       if (rv < 0) {
373         GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"),
374             ("Error reading control data: %d", rv));
375         goto error;
376       }
377     }
378   } while (buf == NULL);
379 
380   GST_LOG_OBJECT (self, "Got buffer %p of size %d", buf, rv);
381 
382   gsb = g_slice_new0 (struct GstShmBuffer);
383   gsb->buf = buf;
384   gsb->pipe = pipe;
385 
386   *outbuf = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY,
387       buf, rv, 0, rv, gsb, free_buffer);
388 
389   return GST_FLOW_OK;
390 
391 error:
392   gst_shm_pipe_dec (pipe);
393   return GST_FLOW_ERROR;
394 flushing:
395   gst_shm_pipe_dec (pipe);
396   return GST_FLOW_FLUSHING;
397 }
398 
399 static GstStateChangeReturn
gst_shm_src_change_state(GstElement * element,GstStateChange transition)400 gst_shm_src_change_state (GstElement * element, GstStateChange transition)
401 {
402   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
403   GstShmSrc *self = GST_SHM_SRC (element);
404 
405   switch (transition) {
406     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
407       if (gst_base_src_is_live (GST_BASE_SRC (element))) {
408         if (!gst_shm_src_start_reading (self))
409           return GST_STATE_CHANGE_FAILURE;
410       }
411     default:
412       break;
413   }
414 
415   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
416   if (ret == GST_STATE_CHANGE_FAILURE)
417     return ret;
418 
419   switch (transition) {
420     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
421       if (gst_base_src_is_live (GST_BASE_SRC (element))) {
422         gst_shm_src_unlock (GST_BASE_SRC (element));
423         gst_shm_src_stop_reading (self);
424       }
425     default:
426       break;
427   }
428 
429   return ret;
430 }
431 
432 static gboolean
gst_shm_src_unlock(GstBaseSrc * bsrc)433 gst_shm_src_unlock (GstBaseSrc * bsrc)
434 {
435   GstShmSrc *self = GST_SHM_SRC (bsrc);
436 
437   self->unlocked = TRUE;
438   gst_poll_set_flushing (self->poll, TRUE);
439 
440   return TRUE;
441 }
442 
443 static gboolean
gst_shm_src_unlock_stop(GstBaseSrc * bsrc)444 gst_shm_src_unlock_stop (GstBaseSrc * bsrc)
445 {
446   GstShmSrc *self = GST_SHM_SRC (bsrc);
447 
448   self->unlocked = FALSE;
449   gst_poll_set_flushing (self->poll, FALSE);
450 
451   return TRUE;
452 }
453 
454 static void
gst_shm_pipe_dec(GstShmPipe * pipe)455 gst_shm_pipe_dec (GstShmPipe * pipe)
456 {
457   g_return_if_fail (pipe);
458   g_return_if_fail (pipe->src);
459   g_return_if_fail (pipe->use_count > 0);
460 
461   GST_OBJECT_LOCK (pipe->src);
462   pipe->use_count--;
463 
464   if (pipe->use_count > 0) {
465     GST_OBJECT_UNLOCK (pipe->src);
466     return;
467   }
468 
469   if (pipe->pipe)
470     sp_client_close (pipe->pipe);
471 
472   gst_poll_remove_fd (pipe->src->poll, &pipe->src->pollfd);
473   gst_poll_fd_init (&pipe->src->pollfd);
474 
475   GST_OBJECT_UNLOCK (pipe->src);
476 
477   gst_object_unref (pipe->src);
478   g_slice_free (GstShmPipe, pipe);
479 }
480