1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wim@fluendo.com>
4  *                    2006 Thomas Vander Stichele <thomas at apestaart dot org>
5  *               2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
6  *
7  * gstipcpipelinesrc.c:
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24 /**
25  * SECTION:element-ipcpipelinesrc
26  * @see_also: #GstIpcPipelineSink, #GstIpcSlavePipeline
27  *
28  * Communicates with an ipcpipelinesink element in another process via a socket.
29  *
30  * The ipcpipelinesrc element allows 2-way communication with an ipcpipelinesink
31  * element on another process/pipeline. It is meant to run inside an
32  * interslavepipeline and when paired with an ipcpipelinesink, it will slave its
33  * whole parent pipeline to the "master" one, which contains the ipcpipelinesink.
34  *
35  * For more details about this mechanism and its uses, see the documentation
36  * of the ipcpipelinesink element.
37  */
38 
39 #ifdef HAVE_CONFIG_H
40 #  include "config.h"
41 #endif
42 
43 #include "gstipcpipelinesrc.h"
44 
45 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
46     GST_PAD_SRC,
47     GST_PAD_ALWAYS,
48     GST_STATIC_CAPS_ANY);
49 
50 GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_src_debug);
51 #define GST_CAT_DEFAULT gst_ipc_pipeline_src_debug
52 
53 enum
54 {
55   /* FILL ME */
56   SIGNAL_FORWARD_MESSAGE,
57   SIGNAL_DISCONNECT,
58   LAST_SIGNAL
59 };
60 static guint gst_ipc_pipeline_src_signals[LAST_SIGNAL] = { 0 };
61 
62 enum
63 {
64   PROP_0,
65   PROP_FDIN,
66   PROP_FDOUT,
67   PROP_READ_CHUNK_SIZE,
68   PROP_ACK_TIME,
69   PROP_LAST,
70 };
71 
72 static GQuark QUARK_UPSTREAM;
73 
74 #define DEFAULT_READ_CHUNK_SIZE 65536
75 #define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND)
76 
77 #define _do_init \
78     GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_src_debug, "ipcpipelinesrc", 0, "ipcpipelinesrc element");
79 #define gst_ipc_pipeline_src_parent_class parent_class
80 G_DEFINE_TYPE_WITH_CODE (GstIpcPipelineSrc, gst_ipc_pipeline_src,
81     GST_TYPE_ELEMENT, _do_init);
82 
83 static void gst_ipc_pipeline_src_finalize (GObject * object);
84 static void gst_ipc_pipeline_src_dispose (GObject * object);
85 static void gst_ipc_pipeline_src_set_property (GObject * object, guint prop_id,
86     const GValue * value, GParamSpec * pspec);
87 static void gst_ipc_pipeline_src_get_property (GObject * object, guint prop_id,
88     GValue * value, GParamSpec * pspec);
89 
90 static void gst_ipc_pipeline_src_cancel_queued (GstIpcPipelineSrc * src);
91 
92 static gboolean gst_ipc_pipeline_src_start_reader_thread (GstIpcPipelineSrc *
93     src);
94 static void gst_ipc_pipeline_src_stop_reader_thread (GstIpcPipelineSrc * src);
95 
96 static gboolean gst_ipc_pipeline_src_activate_mode (GstPad * pad,
97     GstObject * parent, GstPadMode mode, gboolean active);
98 static gboolean gst_ipc_pipeline_src_srcpad_event (GstPad * pad,
99     GstObject * parent, GstEvent * event);
100 static gboolean gst_ipc_pipeline_src_srcpad_query (GstPad * pad,
101     GstObject * parent, GstQuery * query);
102 static void gst_ipc_pipeline_src_loop (GstIpcPipelineSrc * src);
103 
104 static gboolean gst_ipc_pipeline_src_send_event (GstElement * element,
105     GstEvent * event);
106 static gboolean gst_ipc_pipeline_src_query (GstElement * element,
107     GstQuery * query);
108 static GstStateChangeReturn gst_ipc_pipeline_src_change_state (GstElement *
109     element, GstStateChange transition);
110 
111 static gboolean gst_ipc_pipeline_src_forward_message (GstIpcPipelineSrc * src,
112     GstMessage * msg);
113 static void gst_ipc_pipeline_src_disconnect (GstIpcPipelineSrc * src);
114 
115 static void
gst_ipc_pipeline_src_class_init(GstIpcPipelineSrcClass * klass)116 gst_ipc_pipeline_src_class_init (GstIpcPipelineSrcClass * klass)
117 {
118   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
119   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
120 
121   QUARK_UPSTREAM = g_quark_from_static_string ("ipcpipeline-upstream");
122 
123   gobject_class->dispose = gst_ipc_pipeline_src_dispose;
124   gobject_class->finalize = gst_ipc_pipeline_src_finalize;
125 
126   gobject_class->set_property = gst_ipc_pipeline_src_set_property;
127   gobject_class->get_property = gst_ipc_pipeline_src_get_property;
128 
129   gstelement_class->send_event =
130       GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_send_event);
131   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_query);
132   gstelement_class->change_state =
133       GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_change_state);
134 
135   klass->forward_message =
136       GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_forward_message);
137   klass->disconnect = GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_disconnect);
138 
139   GST_DEBUG_REGISTER_FUNCPTR (gst_ipc_pipeline_src_activate_mode);
140   GST_DEBUG_REGISTER_FUNCPTR (gst_ipc_pipeline_src_srcpad_event);
141   GST_DEBUG_REGISTER_FUNCPTR (gst_ipc_pipeline_src_srcpad_query);
142 
143   g_object_class_install_property (gobject_class, PROP_FDIN,
144       g_param_spec_int ("fdin", "Input file descriptor",
145           "File descriptor to read data from",
146           -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
147   g_object_class_install_property (gobject_class, PROP_FDOUT,
148       g_param_spec_int ("fdout", "Output file descriptor",
149           "File descriptor to write data through",
150           -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
151   g_object_class_install_property (gobject_class, PROP_READ_CHUNK_SIZE,
152       g_param_spec_uint ("read-chunk-size", "Read chunk size",
153           "Read chunk size",
154           1, 1 << 24, DEFAULT_READ_CHUNK_SIZE,
155           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
156   g_object_class_install_property (gobject_class, PROP_ACK_TIME,
157       g_param_spec_uint64 ("ack-time", "Ack time",
158           "Maximum time to wait for a response to a message",
159           0, G_MAXUINT64, DEFAULT_ACK_TIME,
160           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
161 
162   gst_ipc_pipeline_src_signals[SIGNAL_FORWARD_MESSAGE] =
163       g_signal_new ("forward-message", G_TYPE_FROM_CLASS (klass),
164       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
165       G_STRUCT_OFFSET (GstIpcPipelineSrcClass, forward_message), NULL, NULL,
166       g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 1, GST_TYPE_MESSAGE);
167 
168   gst_ipc_pipeline_src_signals[SIGNAL_DISCONNECT] =
169       g_signal_new ("disconnect", G_TYPE_FROM_CLASS (klass),
170       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
171       G_STRUCT_OFFSET (GstIpcPipelineSrcClass, disconnect),
172       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
173 
174   gst_element_class_set_static_metadata (gstelement_class,
175       "Inter-process Pipeline Source",
176       "Source",
177       "Continues a split pipeline from another process",
178       "Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>");
179   gst_element_class_add_pad_template (gstelement_class,
180       gst_static_pad_template_get (&srctemplate));
181 }
182 
183 static void
gst_ipc_pipeline_src_init(GstIpcPipelineSrc * src)184 gst_ipc_pipeline_src_init (GstIpcPipelineSrc * src)
185 {
186   GST_OBJECT_FLAG_SET (src, GST_ELEMENT_FLAG_SOURCE);
187 
188   gst_ipc_pipeline_comm_init (&src->comm, GST_ELEMENT (src));
189   src->comm.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
190   src->comm.ack_time = DEFAULT_ACK_TIME;
191   src->flushing = TRUE;
192   src->last_ret = GST_FLOW_FLUSHING;
193   src->queued = NULL;
194   g_cond_init (&src->create_cond);
195 
196   src->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
197   gst_pad_set_activatemode_function (src->srcpad,
198       gst_ipc_pipeline_src_activate_mode);
199   gst_pad_set_event_function (src->srcpad, gst_ipc_pipeline_src_srcpad_event);
200   gst_pad_set_query_function (src->srcpad, gst_ipc_pipeline_src_srcpad_query);
201   gst_element_add_pad (GST_ELEMENT (src), src->srcpad);
202 
203   gst_ipc_pipeline_src_start_reader_thread (src);
204 }
205 
206 static void
gst_ipc_pipeline_src_dispose(GObject * object)207 gst_ipc_pipeline_src_dispose (GObject * object)
208 {
209   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object);
210 
211   gst_ipc_pipeline_src_stop_reader_thread (src);
212   gst_ipc_pipeline_src_cancel_queued (src);
213   gst_ipc_pipeline_comm_cancel (&src->comm, TRUE);
214 
215   G_OBJECT_CLASS (parent_class)->dispose (object);
216 }
217 
218 static void
gst_ipc_pipeline_src_finalize(GObject * object)219 gst_ipc_pipeline_src_finalize (GObject * object)
220 {
221   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object);
222 
223   gst_ipc_pipeline_comm_clear (&src->comm);
224   g_cond_clear (&src->create_cond);
225 
226   G_OBJECT_CLASS (parent_class)->finalize (object);
227 }
228 
229 static void
gst_ipc_pipeline_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)230 gst_ipc_pipeline_src_set_property (GObject * object, guint prop_id,
231     const GValue * value, GParamSpec * pspec)
232 {
233   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object);
234 
235   switch (prop_id) {
236     case PROP_FDIN:
237       src->comm.fdin = g_value_get_int (value);
238       break;
239     case PROP_FDOUT:
240       src->comm.fdout = g_value_get_int (value);
241       break;
242     case PROP_READ_CHUNK_SIZE:
243       src->comm.read_chunk_size = g_value_get_uint (value);
244       break;
245     case PROP_ACK_TIME:
246       src->comm.ack_time = g_value_get_uint64 (value);
247       break;
248     default:
249       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
250       break;
251   }
252 }
253 
254 static void
gst_ipc_pipeline_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)255 gst_ipc_pipeline_src_get_property (GObject * object, guint prop_id,
256     GValue * value, GParamSpec * pspec)
257 {
258   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object);
259 
260   g_return_if_fail (GST_IS_IPC_PIPELINE_SRC (object));
261 
262   switch (prop_id) {
263     case PROP_FDIN:
264       g_value_set_int (value, src->comm.fdin);
265       break;
266     case PROP_FDOUT:
267       g_value_set_int (value, src->comm.fdout);
268       break;
269     case PROP_READ_CHUNK_SIZE:
270       g_value_set_uint (value, src->comm.read_chunk_size);
271       break;
272     case PROP_ACK_TIME:
273       g_value_set_uint64 (value, src->comm.ack_time);
274       break;
275     default:
276       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
277       break;
278   }
279 }
280 
281 static void
gst_ipc_pipeline_src_log_queue(GstIpcPipelineSrc * src)282 gst_ipc_pipeline_src_log_queue (GstIpcPipelineSrc * src)
283 {
284   GList *queued;
285   guint n;
286 
287   queued = src->queued;
288   n = 0;
289   GST_LOG_OBJECT (src, "There are %u objects in the queue",
290       g_list_length (queued));
291   while (queued) {
292     void *object = queued->data;
293     if (GST_IS_EVENT (object)) {
294       GST_LOG_OBJECT (src, "  #%u: %s event", n, GST_EVENT_TYPE_NAME (object));
295     } else if (GST_IS_QUERY (object)) {
296       GST_LOG_OBJECT (src, "  #%u: %s query", n, GST_QUERY_TYPE_NAME (object));
297     } else if (GST_IS_BUFFER (object)) {
298       GST_LOG_OBJECT (src, "  #%u: %zu bytes buffer", n,
299           (size_t) gst_buffer_get_size (object));
300     } else {
301       GST_LOG_OBJECT (src, "  #%u: unknown item in queue", n);
302     }
303     queued = queued->next;
304     ++n;
305   }
306 }
307 
308 static void
gst_ipc_pipeline_src_cancel_queued(GstIpcPipelineSrc * src)309 gst_ipc_pipeline_src_cancel_queued (GstIpcPipelineSrc * src)
310 {
311   GList *queued;
312   guint32 id;
313 
314   g_mutex_lock (&src->comm.mutex);
315   queued = src->queued;
316   src->queued = NULL;
317   g_cond_broadcast (&src->create_cond);
318   g_mutex_unlock (&src->comm.mutex);
319 
320   while (queued) {
321     void *object = queued->data;
322 
323     id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (object),
324             QUARK_ID));
325 
326     queued = g_list_delete_link (queued, queued);
327     if (GST_IS_EVENT (object)) {
328       GstEvent *event = GST_EVENT (object);
329       GST_DEBUG_OBJECT (src, "Cancelling queued event: %" GST_PTR_FORMAT,
330           event);
331       gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, FALSE);
332       gst_event_unref (event);
333     } else if (GST_IS_BUFFER (object)) {
334       GstBuffer *buffer = GST_BUFFER (object);
335       GST_DEBUG_OBJECT (src, "Cancelling queued buffer: %" GST_PTR_FORMAT,
336           buffer);
337       gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id,
338           GST_FLOW_FLUSHING);
339       gst_buffer_unref (buffer);
340     } else if (GST_IS_QUERY (object)) {
341       GstQuery *query = GST_QUERY (object);
342       GST_DEBUG_OBJECT (src, "Cancelling queued query: %" GST_PTR_FORMAT,
343           query);
344       gst_ipc_pipeline_comm_write_query_result_to_fd (&src->comm, id, FALSE,
345           query);
346       gst_query_unref (query);
347     }
348   }
349 
350 }
351 
352 static void
gst_ipc_pipeline_src_start_loop(GstIpcPipelineSrc * src)353 gst_ipc_pipeline_src_start_loop (GstIpcPipelineSrc * src)
354 {
355   g_mutex_lock (&src->comm.mutex);
356   src->flushing = FALSE;
357   src->last_ret = GST_FLOW_OK;
358   g_mutex_unlock (&src->comm.mutex);
359 
360   gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_ipc_pipeline_src_loop,
361       src, NULL);
362 }
363 
364 static void
gst_ipc_pipeline_src_stop_loop(GstIpcPipelineSrc * src,gboolean stop)365 gst_ipc_pipeline_src_stop_loop (GstIpcPipelineSrc * src, gboolean stop)
366 {
367   g_mutex_lock (&src->comm.mutex);
368   src->flushing = TRUE;
369   g_cond_broadcast (&src->create_cond);
370   g_mutex_unlock (&src->comm.mutex);
371 
372   if (stop)
373     gst_pad_stop_task (src->srcpad);
374 }
375 
376 static gboolean
gst_ipc_pipeline_src_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)377 gst_ipc_pipeline_src_activate_mode (GstPad * pad, GstObject * parent,
378     GstPadMode mode, gboolean active)
379 {
380   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (parent);
381 
382   switch (mode) {
383     case GST_PAD_MODE_PUSH:
384       GST_DEBUG_OBJECT (pad, "%s in push mode", active ? "activating" :
385           "deactivating");
386       if (active) {
387         gst_ipc_pipeline_src_start_loop (src);
388       } else {
389         gst_ipc_pipeline_src_stop_loop (src, TRUE);
390         gst_ipc_pipeline_comm_cancel (&src->comm, FALSE);
391       }
392       return TRUE;
393     default:
394       GST_DEBUG_OBJECT (pad, "unsupported activation mode");
395       return FALSE;
396   }
397 }
398 
399 static gboolean
gst_ipc_pipeline_src_srcpad_event(GstPad * pad,GstObject * parent,GstEvent * event)400 gst_ipc_pipeline_src_srcpad_event (GstPad * pad, GstObject * parent,
401     GstEvent * event)
402 {
403   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (parent);
404   gboolean ret;
405 
406   GST_DEBUG_OBJECT (src, "Got upstream event %s", GST_EVENT_TYPE_NAME (event));
407 
408   ret = gst_ipc_pipeline_comm_write_event_to_fd (&src->comm, TRUE, event);
409   gst_event_unref (event);
410 
411   GST_DEBUG_OBJECT (src, "Returning event result: %d", ret);
412   return ret;
413 }
414 
415 static gboolean
gst_ipc_pipeline_src_srcpad_query(GstPad * pad,GstObject * parent,GstQuery * query)416 gst_ipc_pipeline_src_srcpad_query (GstPad * pad, GstObject * parent,
417     GstQuery * query)
418 {
419   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (parent);
420   gboolean ret;
421 
422   /* answer some queries that do not make sense to be forwarded */
423   switch (GST_QUERY_TYPE (query)) {
424     case GST_QUERY_LATENCY:
425       return TRUE;
426     case GST_QUERY_CONTEXT:
427       return FALSE;
428     case GST_QUERY_CAPS:
429     {
430       /* caps queries occur even while linking the pipeline.
431        * It is possible that the ipcpipelinesink may not be connected at this
432        * point, so let's avoid a couple of errors... */
433       GstState state;
434       GST_OBJECT_LOCK (src);
435       state = GST_STATE (src);
436       GST_OBJECT_UNLOCK (src);
437       if (state == GST_STATE_NULL)
438         return FALSE;
439     }
440     default:
441       break;
442   }
443 
444   GST_DEBUG_OBJECT (src, "Got upstream query %s: %" GST_PTR_FORMAT,
445       GST_QUERY_TYPE_NAME (query), query);
446 
447   ret = gst_ipc_pipeline_comm_write_query_to_fd (&src->comm, TRUE, query);
448 
449   GST_DEBUG_OBJECT (src, "Returning query result: %d, %" GST_PTR_FORMAT,
450       ret, query);
451   return ret;
452 }
453 
454 static void
gst_ipc_pipeline_src_loop(GstIpcPipelineSrc * src)455 gst_ipc_pipeline_src_loop (GstIpcPipelineSrc * src)
456 {
457   gpointer object;
458   guint32 id;
459   gboolean ok;
460   GstFlowReturn ret = GST_FLOW_OK;
461 
462   g_mutex_lock (&src->comm.mutex);
463 
464   while (!src->queued && !src->flushing)
465     g_cond_wait (&src->create_cond, &src->comm.mutex);
466 
467   if (src->flushing)
468     goto out;
469 
470   object = src->queued->data;
471   src->queued = g_list_delete_link (src->queued, src->queued);
472   g_mutex_unlock (&src->comm.mutex);
473 
474   id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (object),
475           QUARK_ID));
476 
477   if (GST_IS_BUFFER (object)) {
478     GstBuffer *buf = GST_BUFFER (object);
479     GST_DEBUG_OBJECT (src, "Pushing queued buffer: %" GST_PTR_FORMAT, buf);
480     ret = gst_pad_push (src->srcpad, buf);
481     GST_DEBUG_OBJECT (src, "pushed id %u, ret: %s", id,
482         gst_flow_get_name (ret));
483     gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id, ret);
484   } else if (GST_IS_EVENT (object)) {
485     GstEvent *event = GST_EVENT (object);
486     GST_DEBUG_OBJECT (src, "Pushing queued event: %" GST_PTR_FORMAT, event);
487     ok = gst_pad_push_event (src->srcpad, event);
488     gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ok);
489   } else if (GST_IS_QUERY (object)) {
490     GstQuery *query = GST_QUERY (object);
491     GST_DEBUG_OBJECT (src, "Pushing queued query: %" GST_PTR_FORMAT, query);
492     ok = gst_pad_peer_query (src->srcpad, query);
493     gst_ipc_pipeline_comm_write_query_result_to_fd (&src->comm, id, ok, query);
494     gst_query_unref (query);
495   } else {
496     GST_WARNING_OBJECT (src, "Unknown data type queued");
497   }
498 
499   g_mutex_lock (&src->comm.mutex);
500   if (!src->queued)
501     g_cond_broadcast (&src->create_cond);
502 out:
503   if (src->flushing)
504     ret = GST_FLOW_FLUSHING;
505   if (ret != GST_FLOW_OK)
506     src->last_ret = ret;
507   g_mutex_unlock (&src->comm.mutex);
508 
509   if (ret == GST_FLOW_FLUSHING) {
510     gst_ipc_pipeline_src_cancel_queued (src);
511     gst_pad_pause_task (src->srcpad);
512   }
513 }
514 
515 static gboolean
gst_ipc_pipeline_src_send_event(GstElement * element,GstEvent * event)516 gst_ipc_pipeline_src_send_event (GstElement * element, GstEvent * event)
517 {
518   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
519   return gst_pad_push_event (src->srcpad, event);
520 }
521 
522 static gboolean
gst_ipc_pipeline_src_query(GstElement * element,GstQuery * query)523 gst_ipc_pipeline_src_query (GstElement * element, GstQuery * query)
524 {
525   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
526   return gst_pad_query (src->srcpad, query);
527 }
528 
529 static GstElement *
find_pipeline(GstElement * element)530 find_pipeline (GstElement * element)
531 {
532   GstElement *pipeline = element;
533   while (GST_ELEMENT_PARENT (pipeline)) {
534     pipeline = GST_ELEMENT_PARENT (pipeline);
535     if (GST_IS_PIPELINE (pipeline))
536       break;
537   }
538   if (!pipeline || !GST_IS_PIPELINE (pipeline)) {
539     pipeline = NULL;
540   }
541   return pipeline;
542 }
543 
544 static gboolean
gst_ipc_pipeline_src_forward_message(GstIpcPipelineSrc * src,GstMessage * msg)545 gst_ipc_pipeline_src_forward_message (GstIpcPipelineSrc * src, GstMessage * msg)
546 {
547   gboolean skip = FALSE;
548 
549   GST_DEBUG_OBJECT (src, "Message to forward: %" GST_PTR_FORMAT, msg);
550 
551   switch (GST_MESSAGE_TYPE (msg)) {
552     case GST_MESSAGE_STATE_CHANGED:
553     {
554       GstState old, new, pending;
555       GstElement *pipeline = find_pipeline (GST_ELEMENT (src));
556 
557       gst_message_parse_state_changed (msg, &old, &new, &pending);
558 
559       if (GST_MESSAGE_SRC (msg) == GST_OBJECT (pipeline) &&
560           old == new && new == pending) {
561         GST_DEBUG_OBJECT (src, "Detected lost state, notifying master");
562         gst_ipc_pipeline_comm_write_state_lost_to_fd (&src->comm);
563       }
564       /* fall through & skip */
565     }
566     case GST_MESSAGE_ASYNC_START:
567     case GST_MESSAGE_CLOCK_PROVIDE:
568     case GST_MESSAGE_CLOCK_LOST:
569     case GST_MESSAGE_NEW_CLOCK:
570     case GST_MESSAGE_STREAM_STATUS:
571     case GST_MESSAGE_NEED_CONTEXT:
572     case GST_MESSAGE_HAVE_CONTEXT:
573     case GST_MESSAGE_STRUCTURE_CHANGE:
574       skip = TRUE;
575       break;
576     case GST_MESSAGE_RESET_TIME:
577     {
578       GQuark ipcpipelinesrc_posted = g_quark_from_static_string
579           ("gstinterslavepipeline-message-already-posted");
580 
581       skip = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (msg),
582               ipcpipelinesrc_posted));
583       if (!skip) {
584         gst_mini_object_set_qdata (GST_MINI_OBJECT (msg), ipcpipelinesrc_posted,
585             GUINT_TO_POINTER (1), NULL);
586       }
587       break;
588     }
589     case GST_MESSAGE_ERROR:
590     {
591       GError *error = NULL;
592 
593       /* skip forwarding a RESOURCE/WRITE error message that originated from
594        * ipcpipelinesrc; we post this error when writing to the comm fd fails,
595        * so if we try to forward it here, we will likely get another one posted
596        * immediately and end up doing an endless loop */
597       gst_message_parse_error (msg, &error, NULL);
598       skip = (GST_MESSAGE_SRC (msg) == GST_OBJECT_CAST (src)
599           && error->domain == gst_resource_error_quark ()
600           && error->code == GST_RESOURCE_ERROR_WRITE);
601       g_error_free (error);
602       break;
603     }
604     default:
605       break;
606   }
607 
608   if (skip) {
609     GST_DEBUG_OBJECT (src, "message will not be forwarded");
610     return TRUE;
611   }
612 
613   return gst_ipc_pipeline_comm_write_message_to_fd (&src->comm, msg);
614 }
615 
616 static void
on_buffer(guint32 id,GstBuffer * buffer,gpointer user_data)617 on_buffer (guint32 id, GstBuffer * buffer, gpointer user_data)
618 {
619   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data);
620   GST_DEBUG_OBJECT (src, "Got buffer id %u, queueing: %" GST_PTR_FORMAT, id,
621       buffer);
622   g_mutex_lock (&src->comm.mutex);
623   if (!GST_PAD_IS_ACTIVE (src->srcpad) || src->flushing) {
624     g_mutex_unlock (&src->comm.mutex);
625     GST_INFO_OBJECT (src, "We're not started or flushing, buffer ignored");
626     gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id,
627         GST_FLOW_FLUSHING);
628     gst_buffer_unref (buffer);
629     return;
630   }
631   if (src->last_ret != GST_FLOW_OK) {
632     GstFlowReturn last_ret = src->last_ret;
633     g_mutex_unlock (&src->comm.mutex);
634     GST_DEBUG_OBJECT (src, "Last flow was %s, rejecting buffer",
635         gst_flow_get_name (last_ret));
636     gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id, last_ret);
637     gst_buffer_unref (buffer);
638     return;
639   }
640   src->queued = g_list_append (src->queued, buffer);    /* keep the ref */
641   gst_ipc_pipeline_src_log_queue (src);
642   g_cond_broadcast (&src->create_cond);
643   g_mutex_unlock (&src->comm.mutex);
644 }
645 
646 static void
do_oob_event(GstElement * element,gpointer user_data)647 do_oob_event (GstElement * element, gpointer user_data)
648 {
649   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
650   GstEvent *event = user_data;
651   gboolean ret, upstream;
652   guint32 id;
653 
654   id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT
655           (event), QUARK_ID));
656   upstream = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT
657           (event), QUARK_UPSTREAM));
658 
659   if (upstream) {
660     GstElement *pipeline;
661     gboolean ok = FALSE;
662 
663     if (!(pipeline = find_pipeline (element))) {
664       GST_ERROR_OBJECT (src, "No pipeline found");
665       gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ok);
666     } else {
667       GST_DEBUG_OBJECT (src, "Posting upstream event on pipeline: %"
668           GST_PTR_FORMAT, event);
669       ok = gst_element_send_event (pipeline, gst_event_ref (event));
670       gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ok);
671     }
672   } else {
673     GST_DEBUG_OBJECT (src, "Pushing event async: %" GST_PTR_FORMAT, event);
674     ret = gst_element_send_event (element, gst_event_ref (event));
675     GST_DEBUG_OBJECT (src, "Event pushed, return %d", ret);
676     gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ret);
677   }
678 }
679 
680 static void
on_event(guint32 id,GstEvent * event,gboolean upstream,gpointer user_data)681 on_event (guint32 id, GstEvent * event, gboolean upstream, gpointer user_data)
682 {
683   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data);
684   GstFlowReturn last_ret = GST_FLOW_OK;
685 
686   GST_DEBUG_OBJECT (src, "Got event id %u, queueing: %" GST_PTR_FORMAT, id,
687       event);
688 
689   gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_UPSTREAM,
690       GINT_TO_POINTER (upstream), NULL);
691 
692   switch (GST_EVENT_TYPE (event)) {
693     case GST_EVENT_FLUSH_START:
694       gst_ipc_pipeline_src_stop_loop (src, FALSE);
695       break;
696     case GST_EVENT_FLUSH_STOP:
697       gst_ipc_pipeline_src_start_loop (src);
698       break;
699     default:
700       g_mutex_lock (&src->comm.mutex);
701       last_ret = src->last_ret;
702       g_mutex_unlock (&src->comm.mutex);
703       break;
704   }
705 
706   if (GST_EVENT_IS_SERIALIZED (event) && !upstream) {
707     if (last_ret != GST_FLOW_OK) {
708       GST_DEBUG_OBJECT (src, "Last flow was %s, rejecting event",
709           gst_flow_get_name (last_ret));
710       gst_event_unref (event);
711       gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, FALSE);
712     } else {
713       GST_DEBUG_OBJECT (src, "This is a serialized event, adding to queue %p",
714           src->queued);
715       g_mutex_lock (&src->comm.mutex);
716       src->queued = g_list_append (src->queued, event); /* keep the ref */
717       gst_ipc_pipeline_src_log_queue (src);
718       g_cond_broadcast (&src->create_cond);
719       g_mutex_unlock (&src->comm.mutex);
720     }
721   } else {
722     if (last_ret != GST_FLOW_OK && !upstream) {
723       GST_DEBUG_OBJECT (src, "Last flow was %s, rejecting event",
724           gst_flow_get_name (last_ret));
725       gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, FALSE);
726       gst_event_unref (event);
727     } else {
728       GST_DEBUG_OBJECT (src,
729           "This is not a serialized event, pushing in a thread");
730       gst_element_call_async (GST_ELEMENT (src), do_oob_event, event,
731           (GDestroyNotify) gst_event_unref);
732     }
733   }
734 }
735 
736 static void
do_oob_query(GstElement * element,gpointer user_data)737 do_oob_query (GstElement * element, gpointer user_data)
738 {
739   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
740   GstQuery *query = GST_QUERY (user_data);
741   guint32 id;
742   gboolean upstream;
743   gboolean ret;
744 
745   id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT
746           (query), QUARK_ID));
747   upstream = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT
748           (query), QUARK_UPSTREAM));
749 
750   if (upstream) {
751     GstElement *pipeline;
752 
753     if (!(pipeline = find_pipeline (element))) {
754       GST_ERROR_OBJECT (src, "No pipeline found");
755       ret = FALSE;
756     } else {
757       GST_DEBUG_OBJECT (src, "Posting query on pipeline: %" GST_PTR_FORMAT,
758           query);
759       ret = gst_element_query (pipeline, query);
760     }
761   } else {
762     GST_DEBUG_OBJECT (src, "Pushing query async: %" GST_PTR_FORMAT, query);
763     ret = gst_pad_peer_query (src->srcpad, query);
764     GST_DEBUG_OBJECT (src, "Query pushed, return %d", ret);
765   }
766   gst_ipc_pipeline_comm_write_query_result_to_fd (&src->comm, id, ret, query);
767 }
768 
769 static void
on_query(guint32 id,GstQuery * query,gboolean upstream,gpointer user_data)770 on_query (guint32 id, GstQuery * query, gboolean upstream, gpointer user_data)
771 {
772   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data);
773 
774   GST_DEBUG_OBJECT (src, "Got query id %u, queueing: %" GST_PTR_FORMAT, id,
775       query);
776 
777   if (GST_QUERY_IS_SERIALIZED (query) && !upstream) {
778     g_mutex_lock (&src->comm.mutex);
779     src->queued = g_list_append (src->queued, query);   /* keep the ref */
780     gst_ipc_pipeline_src_log_queue (src);
781     g_cond_broadcast (&src->create_cond);
782     g_mutex_unlock (&src->comm.mutex);
783   } else {
784     gst_mini_object_set_qdata (GST_MINI_OBJECT (query), QUARK_UPSTREAM,
785         GINT_TO_POINTER (upstream), NULL);
786     gst_element_call_async (GST_ELEMENT (src), do_oob_query, query,
787         (GDestroyNotify) gst_query_unref);
788   }
789 }
790 
791 struct StateChangeData
792 {
793   guint32 id;
794   GstStateChange transition;
795 };
796 
797 static void
do_state_change(GstElement * element,gpointer data)798 do_state_change (GstElement * element, gpointer data)
799 {
800   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
801   GstElement *pipeline;
802   GstStateChangeReturn ret = GST_STATE_CHANGE_FAILURE;
803   GstState state, pending, effective;
804   struct StateChangeData *d = data;
805   guint32 id = d->id;
806   GstStateChange transition = d->transition;
807   gboolean down;
808 
809   GST_DEBUG_OBJECT (src, "Doing state change id %u, %s -> %s", id,
810       gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
811       gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
812 
813   if (!(pipeline = find_pipeline (element))) {
814     GST_ERROR_OBJECT (src, "No pipeline found");
815     ret = GST_STATE_CHANGE_FAILURE;
816     goto done_nolock;
817   }
818 
819   down = (GST_STATE_TRANSITION_CURRENT (transition) >=
820       GST_STATE_TRANSITION_NEXT (transition));
821 
822   GST_STATE_LOCK (pipeline);
823   ret = gst_element_get_state (pipeline, &state, &pending, 0);
824 
825   /* if we are pending a state change, count the pending state as
826    * the current one */
827   effective = pending == GST_STATE_VOID_PENDING ? state : pending;
828 
829   GST_DEBUG_OBJECT (src, "Current element state: ret:%s state:%s pending:%s "
830       "effective:%s", gst_element_state_change_return_get_name (ret),
831       gst_element_state_get_name (state),
832       gst_element_state_get_name (pending),
833       gst_element_state_get_name (effective));
834 
835   if ((GST_STATE_TRANSITION_NEXT (transition) <= effective && !down) ||
836       (GST_STATE_TRANSITION_NEXT (transition) > effective && down)) {
837     /* if the request was to transition to a state that we have already
838      * transitioned to in the same direction, then we just silently return */
839     GST_DEBUG_OBJECT (src, "State transition to %s is unnecessary",
840         gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
841     /* make sure we return SUCCESS if the transition is to NULL or READY,
842      * even if our current ret is ASYNC for example; also, make sure not
843      * to return FAILURE, since our state is already committed */
844     if (GST_STATE_TRANSITION_NEXT (transition) <= GST_STATE_READY ||
845         ret == GST_STATE_CHANGE_FAILURE) {
846       ret = GST_STATE_CHANGE_SUCCESS;
847     }
848   } else if (ret != GST_STATE_CHANGE_FAILURE || down) {
849     /* if the request was to transition to a state that we haven't already
850      * transitioned to in the same direction, then we need to request a state
851      * change in the pipeline, *unless* we are going upwards and the last ret
852      * was FAILURE, in which case we should just return FAILURE and stop.
853      * We don't stop a downwards state change though in case of FAILURE, since
854      * we need to be able to bring the pipeline down to NULL. Note that
855      * GST_MESSAGE_ERROR will cause ret to be GST_STATE_CHANGE_FAILURE */
856     ret = gst_element_set_state (pipeline,
857         GST_STATE_TRANSITION_NEXT (transition));
858     GST_DEBUG_OBJECT (src, "gst_element_set_state returned %s",
859         gst_element_state_change_return_get_name (ret));
860   }
861 
862   GST_STATE_UNLOCK (pipeline);
863 
864 done_nolock:
865   GST_DEBUG_OBJECT (src, "sending state change ack, ret = %s",
866       gst_element_state_change_return_get_name (ret));
867   gst_ipc_pipeline_comm_write_state_change_ack_to_fd (&src->comm, id, ret);
868 }
869 
870 static void
on_state_change(guint32 id,GstStateChange transition,gpointer user_data)871 on_state_change (guint32 id, GstStateChange transition, gpointer user_data)
872 {
873   struct StateChangeData *d;
874   GstElement *ipcpipelinesrc = GST_ELEMENT (user_data);
875 
876   GST_DEBUG_OBJECT (ipcpipelinesrc, "Got state change id %u, %s -> %s", id,
877       gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
878       gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
879 
880   d = g_new (struct StateChangeData, 1);
881   d->id = id;
882   d->transition = transition;
883 
884   gst_element_call_async (ipcpipelinesrc, do_state_change, d, g_free);
885 }
886 
887 static void
on_message(guint32 id,GstMessage * message,gpointer user_data)888 on_message (guint32 id, GstMessage * message, gpointer user_data)
889 {
890   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data);
891 
892   GST_ERROR_OBJECT (src, "Got message id %u, not supposed to: %" GST_PTR_FORMAT,
893       id, message);
894   gst_message_unref (message);
895 }
896 
897 static gboolean
gst_ipc_pipeline_src_start_reader_thread(GstIpcPipelineSrc * src)898 gst_ipc_pipeline_src_start_reader_thread (GstIpcPipelineSrc * src)
899 {
900   if (!gst_ipc_pipeline_comm_start_reader_thread (&src->comm, on_buffer,
901           on_event, on_query, on_state_change, NULL, on_message, src)) {
902     GST_ERROR_OBJECT (src, "Failed to start reader thread");
903     return FALSE;
904   }
905   return TRUE;
906 }
907 
908 static void
gst_ipc_pipeline_src_stop_reader_thread(GstIpcPipelineSrc * src)909 gst_ipc_pipeline_src_stop_reader_thread (GstIpcPipelineSrc * src)
910 {
911   gst_ipc_pipeline_comm_stop_reader_thread (&src->comm);
912 }
913 
914 static void
gst_ipc_pipeline_src_disconnect(GstIpcPipelineSrc * src)915 gst_ipc_pipeline_src_disconnect (GstIpcPipelineSrc * src)
916 {
917   GST_DEBUG_OBJECT (src, "Disconnecting");
918   gst_ipc_pipeline_src_stop_reader_thread (src);
919   src->comm.fdin = -1;
920   src->comm.fdout = -1;
921   gst_ipc_pipeline_comm_cancel (&src->comm, FALSE);
922   gst_ipc_pipeline_src_start_reader_thread (src);
923 }
924 
925 static GstStateChangeReturn
gst_ipc_pipeline_src_change_state(GstElement * element,GstStateChange transition)926 gst_ipc_pipeline_src_change_state (GstElement * element,
927     GstStateChange transition)
928 {
929   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
930 
931   switch (transition) {
932     case GST_STATE_CHANGE_NULL_TO_READY:
933       if (src->comm.fdin < 0) {
934         GST_ERROR_OBJECT (element, "Invalid fdin: %d", src->comm.fdin);
935         return GST_STATE_CHANGE_FAILURE;
936       }
937       if (src->comm.fdout < 0) {
938         GST_ERROR_OBJECT (element, "Invalid fdout: %d", src->comm.fdout);
939         return GST_STATE_CHANGE_FAILURE;
940       }
941       if (!src->comm.reader_thread) {
942         GST_ERROR_OBJECT (element, "Failed to start reader thread");
943         return GST_STATE_CHANGE_FAILURE;
944       }
945       break;
946     default:
947       break;
948   }
949   return GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
950 }
951