1 /* GStreamer
2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2005 Wim Taymans <wim@fluendo.com>
4 * 2006 Thomas Vander Stichele <thomas at apestaart dot org>
5 * 2014 Tim-Philipp Müller <tim centricular com>
6 * 2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
7 *
8 * gstipcpipelinesink.c:
9 *
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Library General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Library General Public License for more details.
19 *
20 * You should have received a copy of the GNU Library General Public
21 * License along with this library; if not, write to the
22 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23 * Boston, MA 02110-1301, USA.
24 */
25 /**
26 * SECTION:element-ipcpipelinesink
27 * @see_also: #GstIpcPipelineSrc, #GstIpcSlavePipeline
28 *
29 * Communicates with an ipcpipelinesrc element in another process via a socket.
30 *
31 * This element, together with ipcpipelinesrc and ipcslavepipeline form a
32 * mechanism that allows splitting a single pipeline in different processes.
33 * The main use-case for it is a playback pipeline split in two parts, where the
34 * first part contains the networking, parsing and demuxing and the second part
35 * contains the decoding and display. The intention of this split is to improve
36 * security of an application, by letting the networking, parsing and demuxing
37 * parts run in a less privileged process than the process that accesses the
38 * decoder and display.
39 *
40 * Once the pipelines in those different processes have been created, the
41 * playback can be controlled entirely from the first pipeline, which is the
42 * one that contains ipcpipelinesink. We call this pipeline the “master”.
43 * All relevant events and queries sent from the application are sent to
44 * the master pipeline and messages to the application are sent from the master
45 * pipeline. The second pipeline, in the other process, is transparently slaved.
46 *
47 * ipcpipelinesink can work only in push mode and does not synchronize buffers
48 * to the clock. Synchronization is meant to happen either at the real sink at
49 * the end of the remote slave pipeline, or not to happen at all, if the
50 * pipeline is live.
51 *
52 * A master pipeline may contain more than one ipcpipelinesink elements, which
53 * can be connected either to the same slave pipeline or to different ones.
54 *
55 * Communication with ipcpipelinesrc on the slave happens via a socket, using a
56 * custom protocol. Each buffer, event, query, message or state change is
57 * serialized in a "packet" and sent over the socket. The sender then
58 * performs a blocking wait for a reply, if a return code is needed.
59 *
60 * All objects that contan a GstStructure (messages, queries, events) are
61 * serialized by serializing the GstStructure to a string
62 * (gst_structure_to_string). This implies some limitations, of course.
63 * All fields of this structures that are not serializable to strings (ex.
64 * object pointers) are ignored, except for some cases where custom
65 * serialization may occur (ex error/warning/info messages that contain a
66 * GError are serialized differently).
67 *
68 * Buffers are transported by writing their content directly on the socket.
69 * More efficient ways for memory sharing could be implemented in the future.
70 */
71
72 #ifdef HAVE_CONFIG_H
73 # include "config.h"
74 #endif
75
76 #include "gstipcpipelinesink.h"
77
78 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
79 GST_PAD_SINK,
80 GST_PAD_ALWAYS,
81 GST_STATIC_CAPS_ANY);
82
83 GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_sink_debug);
84 #define GST_CAT_DEFAULT gst_ipc_pipeline_sink_debug
85
86 enum
87 {
88 SIGNAL_DISCONNECT,
89 /* FILL ME */
90 LAST_SIGNAL
91 };
92 static guint gst_ipc_pipeline_sink_signals[LAST_SIGNAL] = { 0 };
93
94 enum
95 {
96 PROP_0,
97 PROP_FDIN,
98 PROP_FDOUT,
99 PROP_READ_CHUNK_SIZE,
100 PROP_ACK_TIME,
101 };
102
103
104 #define DEFAULT_READ_CHUNK_SIZE 4096
105 #define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND)
106
107 #define _do_init \
108 GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_sink_debug, "ipcpipelinesink", 0, "ipcpipelinesink element");
109 #define gst_ipc_pipeline_sink_parent_class parent_class
110 G_DEFINE_TYPE_WITH_CODE (GstIpcPipelineSink, gst_ipc_pipeline_sink,
111 GST_TYPE_ELEMENT, _do_init);
112
113 static void gst_ipc_pipeline_sink_set_property (GObject * object, guint prop_id,
114 const GValue * value, GParamSpec * pspec);
115 static void gst_ipc_pipeline_sink_get_property (GObject * object, guint prop_id,
116 GValue * value, GParamSpec * pspec);
117 static void gst_ipc_pipeline_sink_dispose (GObject * obj);
118 static void gst_ipc_pipeline_sink_finalize (GObject * obj);
119 static gboolean gst_ipc_pipeline_sink_start_reader_thread (GstIpcPipelineSink *
120 sink);
121 static void gst_ipc_pipeline_sink_stop_reader_thread (GstIpcPipelineSink *
122 sink);
123
124 static GstStateChangeReturn gst_ipc_pipeline_sink_change_state (GstElement *
125 element, GstStateChange transition);
126
127 static GstFlowReturn gst_ipc_pipeline_sink_chain (GstPad * pad,
128 GstObject * parent, GstBuffer * buffer);
129 static gboolean gst_ipc_pipeline_sink_event (GstPad * pad, GstObject * parent,
130 GstEvent * event);
131 static gboolean gst_ipc_pipeline_sink_element_query (GstElement * element,
132 GstQuery * query);
133 static gboolean gst_ipc_pipeline_sink_send_event (GstElement * element,
134 GstEvent * event);
135 static gboolean gst_ipc_pipeline_sink_query (GstPad * pad, GstObject * parent,
136 GstQuery * query);
137 static gboolean gst_ipc_pipeline_sink_pad_activate_mode (GstPad * pad,
138 GstObject * parent, GstPadMode mode, gboolean active);
139
140
141 static void gst_ipc_pipeline_sink_disconnect (GstIpcPipelineSink * sink);
142 static void pusher (gpointer data, gpointer user_data);
143
144
145 static void
gst_ipc_pipeline_sink_class_init(GstIpcPipelineSinkClass * klass)146 gst_ipc_pipeline_sink_class_init (GstIpcPipelineSinkClass * klass)
147 {
148 GObjectClass *gobject_class;
149 GstElementClass *gstelement_class;
150
151 gobject_class = G_OBJECT_CLASS (klass);
152 gstelement_class = GST_ELEMENT_CLASS (klass);
153
154 gobject_class->set_property = gst_ipc_pipeline_sink_set_property;
155 gobject_class->get_property = gst_ipc_pipeline_sink_get_property;
156 gobject_class->dispose = gst_ipc_pipeline_sink_dispose;
157 gobject_class->finalize = gst_ipc_pipeline_sink_finalize;
158
159 g_object_class_install_property (gobject_class, PROP_FDIN,
160 g_param_spec_int ("fdin", "Input file descriptor",
161 "File descriptor to received data from",
162 -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
163 g_object_class_install_property (gobject_class, PROP_FDOUT,
164 g_param_spec_int ("fdout", "Output file descriptor",
165 "File descriptor to send data through",
166 -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
167 g_object_class_install_property (gobject_class, PROP_READ_CHUNK_SIZE,
168 g_param_spec_uint ("read-chunk-size", "Read chunk size",
169 "Read chunk size",
170 1, 1 << 24, DEFAULT_READ_CHUNK_SIZE,
171 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
172 g_object_class_install_property (gobject_class, PROP_ACK_TIME,
173 g_param_spec_uint64 ("ack-time", "Ack time",
174 "Maximum time to wait for a response to a message",
175 0, G_MAXUINT64, DEFAULT_ACK_TIME,
176 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
177
178 gst_ipc_pipeline_sink_signals[SIGNAL_DISCONNECT] =
179 g_signal_new ("disconnect",
180 G_TYPE_FROM_CLASS (klass),
181 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
182 G_STRUCT_OFFSET (GstIpcPipelineSinkClass, disconnect),
183 NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
184
185 gst_element_class_set_static_metadata (gstelement_class,
186 "Inter-process Pipeline Sink",
187 "Sink",
188 "Allows splitting and continuing a pipeline in another process",
189 "Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>");
190 gst_element_class_add_pad_template (gstelement_class,
191 gst_static_pad_template_get (&sinktemplate));
192
193 gstelement_class->change_state =
194 GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_change_state);
195 gstelement_class->query =
196 GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_element_query);
197 gstelement_class->send_event =
198 GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_send_event);
199
200 klass->disconnect = GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_disconnect);
201 }
202
203 static void
gst_ipc_pipeline_sink_init(GstIpcPipelineSink * sink)204 gst_ipc_pipeline_sink_init (GstIpcPipelineSink * sink)
205 {
206 GstPadTemplate *pad_template;
207
208 GST_OBJECT_FLAG_SET (sink, GST_ELEMENT_FLAG_SINK);
209
210 gst_ipc_pipeline_comm_init (&sink->comm, GST_ELEMENT (sink));
211 sink->comm.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
212 sink->comm.ack_time = DEFAULT_ACK_TIME;
213 sink->comm.fdin = -1;
214 sink->comm.fdout = -1;
215 sink->threads = g_thread_pool_new (pusher, sink, -1, FALSE, NULL);
216 gst_ipc_pipeline_sink_start_reader_thread (sink);
217
218 pad_template =
219 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (sink), "sink");
220 g_return_if_fail (pad_template != NULL);
221
222 sink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
223
224 gst_pad_set_activatemode_function (sink->sinkpad,
225 gst_ipc_pipeline_sink_pad_activate_mode);
226 gst_pad_set_query_function (sink->sinkpad, gst_ipc_pipeline_sink_query);
227 gst_pad_set_event_function (sink->sinkpad, gst_ipc_pipeline_sink_event);
228 gst_pad_set_chain_function (sink->sinkpad, gst_ipc_pipeline_sink_chain);
229 gst_element_add_pad (GST_ELEMENT_CAST (sink), sink->sinkpad);
230
231 }
232
233 static void
gst_ipc_pipeline_sink_dispose(GObject * obj)234 gst_ipc_pipeline_sink_dispose (GObject * obj)
235 {
236 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (obj);
237
238 gst_ipc_pipeline_sink_stop_reader_thread (sink);
239 gst_ipc_pipeline_comm_cancel (&sink->comm, TRUE);
240
241 G_OBJECT_CLASS (parent_class)->dispose (obj);
242 }
243
244 static void
gst_ipc_pipeline_sink_finalize(GObject * obj)245 gst_ipc_pipeline_sink_finalize (GObject * obj)
246 {
247 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (obj);
248
249 gst_ipc_pipeline_comm_clear (&sink->comm);
250 g_thread_pool_free (sink->threads, TRUE, TRUE);
251
252 G_OBJECT_CLASS (parent_class)->finalize (obj);
253 }
254
255 static void
gst_ipc_pipeline_sink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)256 gst_ipc_pipeline_sink_set_property (GObject * object, guint prop_id,
257 const GValue * value, GParamSpec * pspec)
258 {
259 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (object);
260
261 switch (prop_id) {
262 case PROP_FDIN:
263 sink->comm.fdin = g_value_get_int (value);
264 break;
265 case PROP_FDOUT:
266 sink->comm.fdout = g_value_get_int (value);
267 break;
268 case PROP_READ_CHUNK_SIZE:
269 sink->comm.read_chunk_size = g_value_get_uint (value);
270 break;
271 case PROP_ACK_TIME:
272 sink->comm.ack_time = g_value_get_uint64 (value);
273 break;
274 default:
275 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
276 break;
277 }
278 }
279
280 static void
gst_ipc_pipeline_sink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)281 gst_ipc_pipeline_sink_get_property (GObject * object, guint prop_id,
282 GValue * value, GParamSpec * pspec)
283 {
284 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (object);
285
286 switch (prop_id) {
287 case PROP_FDIN:
288 g_value_set_int (value, sink->comm.fdin);
289 break;
290 case PROP_FDOUT:
291 g_value_set_int (value, sink->comm.fdout);
292 break;
293 case PROP_READ_CHUNK_SIZE:
294 g_value_set_uint (value, sink->comm.read_chunk_size);
295 break;
296 case PROP_ACK_TIME:
297 g_value_set_uint64 (value, sink->comm.ack_time);
298 break;
299 default:
300 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
301 break;
302 }
303 }
304
305 static gboolean
gst_ipc_pipeline_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)306 gst_ipc_pipeline_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
307 {
308 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent);
309 gboolean ret;
310
311 GST_DEBUG_OBJECT (sink, "received event %p of type %s (%d)",
312 event, gst_event_type_get_name (event->type), event->type);
313
314 ret = gst_ipc_pipeline_comm_write_event_to_fd (&sink->comm, FALSE, event);
315 gst_event_unref (event);
316 return ret;
317 }
318
319 static GstFlowReturn
gst_ipc_pipeline_sink_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)320 gst_ipc_pipeline_sink_chain (GstPad * pad, GstObject * parent,
321 GstBuffer * buffer)
322 {
323 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent);
324 GstFlowReturn ret;
325
326 GST_DEBUG_OBJECT (sink, "Rendering buffer %" GST_PTR_FORMAT, buffer);
327
328 ret = gst_ipc_pipeline_comm_write_buffer_to_fd (&sink->comm, buffer);
329 if (ret != GST_FLOW_OK)
330 GST_DEBUG_OBJECT (sink, "Peer result was %s", gst_flow_get_name (ret));
331
332 gst_buffer_unref (buffer);
333 return ret;
334 }
335
336 static gboolean
gst_ipc_pipeline_sink_query(GstPad * pad,GstObject * parent,GstQuery * query)337 gst_ipc_pipeline_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
338 {
339 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent);
340 gboolean ret;
341
342 GST_DEBUG_OBJECT (sink, "Got query %s: %" GST_PTR_FORMAT,
343 GST_QUERY_TYPE_NAME (query), query);
344
345 switch (GST_QUERY_TYPE (query)) {
346 case GST_QUERY_ALLOCATION:
347 GST_DEBUG_OBJECT (sink, "Rejecting ALLOCATION query");
348 return FALSE;
349 case GST_QUERY_CAPS:
350 {
351 /* caps queries occur even while linking the pipeline.
352 * It is possible that the ipcpipelinesrc may not be connected at this
353 * point, so let's avoid a couple of errors... */
354 GstState state;
355 GST_OBJECT_LOCK (sink);
356 state = GST_STATE (sink);
357 GST_OBJECT_UNLOCK (sink);
358 if (state == GST_STATE_NULL)
359 return FALSE;
360 }
361 default:
362 break;
363 }
364 ret = gst_ipc_pipeline_comm_write_query_to_fd (&sink->comm, FALSE, query);
365
366 return ret;
367 }
368
369 static gboolean
gst_ipc_pipeline_sink_element_query(GstElement * element,GstQuery * query)370 gst_ipc_pipeline_sink_element_query (GstElement * element, GstQuery * query)
371 {
372 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
373 gboolean ret;
374
375 GST_DEBUG_OBJECT (sink, "Got element query %s: %" GST_PTR_FORMAT,
376 GST_QUERY_TYPE_NAME (query), query);
377
378 ret = gst_ipc_pipeline_comm_write_query_to_fd (&sink->comm, TRUE, query);
379 GST_DEBUG_OBJECT (sink, "Got query reply: %d: %" GST_PTR_FORMAT, ret, query);
380 return ret;
381 }
382
383 static gboolean
gst_ipc_pipeline_sink_send_event(GstElement * element,GstEvent * event)384 gst_ipc_pipeline_sink_send_event (GstElement * element, GstEvent * event)
385 {
386 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
387 gboolean ret;
388
389 GST_DEBUG_OBJECT (sink, "Got element event %s: %" GST_PTR_FORMAT,
390 GST_EVENT_TYPE_NAME (event), event);
391
392 ret = gst_ipc_pipeline_comm_write_event_to_fd (&sink->comm, TRUE, event);
393 GST_DEBUG_OBJECT (sink, "Got event reply: %d: %" GST_PTR_FORMAT, ret, event);
394
395 gst_event_unref (event);
396 return ret;
397 }
398
399
400 static gboolean
gst_ipc_pipeline_sink_pad_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)401 gst_ipc_pipeline_sink_pad_activate_mode (GstPad * pad,
402 GstObject * parent, GstPadMode mode, gboolean active)
403 {
404 if (mode == GST_PAD_MODE_PULL)
405 return FALSE;
406 return TRUE;
407 }
408
409 static void
on_buffer(guint32 id,GstBuffer * buffer,gpointer user_data)410 on_buffer (guint32 id, GstBuffer * buffer, gpointer user_data)
411 {
412 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
413 GST_ERROR_OBJECT (sink,
414 "Got buffer id %u! I never knew buffers could go upstream...", id);
415 gst_buffer_unref (buffer);
416 }
417
418 static void
pusher(gpointer data,gpointer user_data)419 pusher (gpointer data, gpointer user_data)
420 {
421 GstIpcPipelineSink *sink = user_data;
422 gboolean ret;
423 guint32 id;
424
425 id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (data),
426 QUARK_ID));
427
428 if (GST_IS_EVENT (data)) {
429 GstEvent *event = GST_EVENT (data);
430 GST_DEBUG_OBJECT (sink, "Pushing event async: %" GST_PTR_FORMAT, event);
431 ret = gst_pad_push_event (sink->sinkpad, event);
432 GST_DEBUG_OBJECT (sink, "Event pushed, return %d", ret);
433 gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&sink->comm, id, ret);
434 } else if (GST_IS_QUERY (data)) {
435 GstQuery *query = GST_QUERY (data);
436 GST_DEBUG_OBJECT (sink, "Pushing query async: %" GST_PTR_FORMAT, query);
437 ret = gst_pad_peer_query (sink->sinkpad, query);
438 GST_DEBUG_OBJECT (sink, "Query pushed, return %d", ret);
439 gst_ipc_pipeline_comm_write_query_result_to_fd (&sink->comm, id, ret,
440 query);
441 gst_query_unref (query);
442 } else {
443 GST_ERROR_OBJECT (sink, "Unsupported object type");
444 }
445 gst_object_unref (sink);
446 }
447
448 static void
on_event(guint32 id,GstEvent * event,gboolean upstream,gpointer user_data)449 on_event (guint32 id, GstEvent * event, gboolean upstream, gpointer user_data)
450 {
451 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
452
453 if (!upstream) {
454 GST_ERROR_OBJECT (sink, "Got downstream event id %u! Not supposed to...",
455 id);
456 gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&sink->comm, id, FALSE);
457 gst_event_unref (event);
458 return;
459 }
460
461 GST_DEBUG_OBJECT (sink, "Got event id %u: %" GST_PTR_FORMAT, id, event);
462 gst_object_ref (sink);
463 g_thread_pool_push (sink->threads, event, NULL);
464 }
465
466 static void
on_query(guint32 id,GstQuery * query,gboolean upstream,gpointer user_data)467 on_query (guint32 id, GstQuery * query, gboolean upstream, gpointer user_data)
468 {
469 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
470
471 if (!upstream) {
472 GST_ERROR_OBJECT (sink, "Got downstream query id %u! Not supposed to...",
473 id);
474 gst_ipc_pipeline_comm_write_query_result_to_fd (&sink->comm, id, FALSE,
475 query);
476 gst_query_unref (query);
477 return;
478 }
479
480 GST_DEBUG_OBJECT (sink, "Got query id %u: %" GST_PTR_FORMAT, id, query);
481 gst_object_ref (sink);
482 g_thread_pool_push (sink->threads, query, NULL);
483 }
484
485 static void
on_state_change(guint32 id,GstStateChange transition,gpointer user_data)486 on_state_change (guint32 id, GstStateChange transition, gpointer user_data)
487 {
488 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
489 GST_ERROR_OBJECT (sink, "Got state change id %u! Not supposed to...", id);
490 }
491
492 static void
on_state_lost(gpointer user_data)493 on_state_lost (gpointer user_data)
494 {
495 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
496
497 GST_DEBUG_OBJECT (sink, "Got state lost notification, losing state");
498
499 GST_OBJECT_LOCK (sink);
500 sink->pass_next_async_done = TRUE;
501 GST_OBJECT_UNLOCK (sink);
502
503 gst_element_lost_state (GST_ELEMENT (sink));
504 }
505
506 static void
do_async_done(GstElement * element,gpointer user_data)507 do_async_done (GstElement * element, gpointer user_data)
508 {
509 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
510 GstMessage *message = user_data;
511
512 GST_STATE_LOCK (sink);
513 GST_OBJECT_LOCK (sink);
514 if (sink->pass_next_async_done) {
515 sink->pass_next_async_done = FALSE;
516 GST_OBJECT_UNLOCK (sink);
517 gst_element_continue_state (element, GST_STATE_CHANGE_SUCCESS);
518 GST_STATE_UNLOCK (sink);
519 gst_element_post_message (element, gst_message_ref (message));
520
521 } else {
522 GST_OBJECT_UNLOCK (sink);
523 GST_STATE_UNLOCK (sink);
524 }
525 }
526
527 static void
on_message(guint32 id,GstMessage * message,gpointer user_data)528 on_message (guint32 id, GstMessage * message, gpointer user_data)
529 {
530 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
531
532 GST_DEBUG_OBJECT (sink, "Got message id %u: %" GST_PTR_FORMAT, id, message);
533
534 switch (GST_MESSAGE_TYPE (message)) {
535 case GST_MESSAGE_ASYNC_DONE:
536 GST_OBJECT_LOCK (sink);
537 if (sink->pass_next_async_done) {
538 GST_OBJECT_UNLOCK (sink);
539 gst_element_call_async (GST_ELEMENT (sink), do_async_done,
540 message, (GDestroyNotify) gst_message_unref);
541 } else {
542 GST_OBJECT_UNLOCK (sink);
543 gst_message_unref (message);
544 }
545 return;
546 default:
547 break;
548 }
549
550 gst_element_post_message (GST_ELEMENT (sink), message);
551 }
552
553 static gboolean
gst_ipc_pipeline_sink_start_reader_thread(GstIpcPipelineSink * sink)554 gst_ipc_pipeline_sink_start_reader_thread (GstIpcPipelineSink * sink)
555 {
556 if (!gst_ipc_pipeline_comm_start_reader_thread (&sink->comm, on_buffer,
557 on_event, on_query, on_state_change, on_state_lost, on_message,
558 sink)) {
559 GST_ERROR_OBJECT (sink, "Failed to start reader thread");
560 return FALSE;
561 }
562 return TRUE;
563 }
564
565 static void
gst_ipc_pipeline_sink_stop_reader_thread(GstIpcPipelineSink * sink)566 gst_ipc_pipeline_sink_stop_reader_thread (GstIpcPipelineSink * sink)
567 {
568 gst_ipc_pipeline_comm_stop_reader_thread (&sink->comm);
569 }
570
571
572 static void
gst_ipc_pipeline_sink_disconnect(GstIpcPipelineSink * sink)573 gst_ipc_pipeline_sink_disconnect (GstIpcPipelineSink * sink)
574 {
575 GST_DEBUG_OBJECT (sink, "Disconnecting");
576 gst_ipc_pipeline_sink_stop_reader_thread (sink);
577 sink->comm.fdin = -1;
578 sink->comm.fdout = -1;
579 gst_ipc_pipeline_comm_cancel (&sink->comm, FALSE);
580 gst_ipc_pipeline_sink_start_reader_thread (sink);
581 }
582
583 static GstStateChangeReturn
gst_ipc_pipeline_sink_change_state(GstElement * element,GstStateChange transition)584 gst_ipc_pipeline_sink_change_state (GstElement * element,
585 GstStateChange transition)
586 {
587 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
588 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
589 GstStateChangeReturn peer_ret = GST_STATE_CHANGE_SUCCESS;
590 gboolean async = FALSE;
591 gboolean down = FALSE;
592
593 GST_DEBUG_OBJECT (sink, "Got state change request: %s -> %s",
594 gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
595 gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
596
597 switch (transition) {
598 case GST_STATE_CHANGE_NULL_TO_READY:
599 if (sink->comm.fdin < 0) {
600 GST_ERROR_OBJECT (element, "Invalid fdin: %d", sink->comm.fdin);
601 return GST_STATE_CHANGE_FAILURE;
602 }
603 if (sink->comm.fdout < 0) {
604 GST_ERROR_OBJECT (element, "Invalid fdout: %d", sink->comm.fdout);
605 return GST_STATE_CHANGE_FAILURE;
606 }
607 if (!sink->comm.reader_thread) {
608 GST_ERROR_OBJECT (element, "Failed to start reader thread");
609 return GST_STATE_CHANGE_FAILURE;
610 }
611 break;
612 case GST_STATE_CHANGE_READY_TO_PAUSED:
613 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
614 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
615 /* In these transitions, it is possible that the peer returns ASYNC.
616 * We don't know that in advance, but we post async-start anyway because
617 * it needs to be delivered *before* async-done, and async-done may
618 * arrive at any point in time after we've set the state of the peer.
619 * In case the peer doesn't return ASYNC, we just post async-done
620 * ourselves and the parent GstBin takes care of matching and deleting
621 * them, so the app never gets any of these. */
622 async = TRUE;
623 break;
624 default:
625 break;
626 }
627
628 /* downwards state change */
629 down = (GST_STATE_TRANSITION_CURRENT (transition) >=
630 GST_STATE_TRANSITION_NEXT (transition));
631
632 if (async) {
633 GST_DEBUG_OBJECT (sink,
634 "Posting async-start for %s, will need state-change-done",
635 gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
636
637 gst_element_post_message (GST_ELEMENT (sink),
638 gst_message_new_async_start (GST_OBJECT (sink)));
639
640 GST_OBJECT_LOCK (sink);
641 sink->pass_next_async_done = TRUE;
642 GST_OBJECT_UNLOCK (sink);
643 }
644
645 /* change the state of the peer first */
646 /* If the fd out is -1, we do not actually call the peer. This will happen
647 when we explicitely disconnected, and in that case we want to be able
648 to bring the element down to NULL, so it can be restarted with a new
649 slave pipeline. */
650 if (sink->comm.fdout >= 0) {
651 GST_DEBUG_OBJECT (sink, "Calling peer with state change");
652 peer_ret = gst_ipc_pipeline_comm_write_state_change_to_fd (&sink->comm,
653 transition);
654 if (peer_ret == GST_STATE_CHANGE_FAILURE && down) {
655 GST_WARNING_OBJECT (sink, "Peer returned state change failure, "
656 "but ignoring because we are going down");
657 peer_ret = GST_STATE_CHANGE_SUCCESS;
658 }
659 } else {
660 if (down) {
661 GST_WARNING_OBJECT (sink, "Not calling peer (fdout %d)",
662 sink->comm.fdout);
663 peer_ret = GST_STATE_CHANGE_SUCCESS;
664 } else {
665 GST_ERROR_OBJECT (sink, "Not calling peer (fdout %d) and failing",
666 sink->comm.fdout);
667 peer_ret = GST_STATE_CHANGE_FAILURE;
668 }
669 }
670
671 /* chain up to the parent class to change our state, if the peer succeeded */
672 if (peer_ret != GST_STATE_CHANGE_FAILURE) {
673 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
674
675 if (G_UNLIKELY (ret == GST_STATE_CHANGE_FAILURE && down)) {
676 GST_WARNING_OBJECT (sink, "Parent returned state change failure, "
677 "but ignoring because we are going down");
678 ret = GST_STATE_CHANGE_SUCCESS;
679 }
680 }
681
682 GST_DEBUG_OBJECT (sink, "For %s -> %s: Peer ret: %s, parent ret: %s",
683 gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
684 gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)),
685 gst_element_state_change_return_get_name (peer_ret),
686 gst_element_state_change_return_get_name (ret));
687
688 /* now interpret the return codes */
689 if (async && peer_ret != GST_STATE_CHANGE_ASYNC) {
690 GST_DEBUG_OBJECT (sink, "Posting async-done for %s; peer wasn't ASYNC",
691 gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
692
693 GST_OBJECT_LOCK (sink);
694 sink->pass_next_async_done = FALSE;
695 GST_OBJECT_UNLOCK (sink);
696
697 gst_element_post_message (GST_ELEMENT (sink),
698 gst_message_new_async_done (GST_OBJECT (sink), GST_CLOCK_TIME_NONE));
699 } else if (G_UNLIKELY (!async && peer_ret == GST_STATE_CHANGE_ASYNC)) {
700 GST_WARNING_OBJECT (sink, "Transition not async but peer returned ASYNC");
701 peer_ret = GST_STATE_CHANGE_SUCCESS;
702 }
703
704 if (peer_ret == GST_STATE_CHANGE_FAILURE || ret == GST_STATE_CHANGE_FAILURE) {
705 if (peer_ret != GST_STATE_CHANGE_FAILURE && sink->comm.fdout >= 0) {
706 /* only the parent's ret was FAILURE - revert remote changes */
707 GST_DEBUG_OBJECT (sink, "Reverting remote state change because parent "
708 "returned failure");
709 gst_ipc_pipeline_comm_write_state_change_to_fd (&sink->comm,
710 GST_STATE_TRANSITION (GST_STATE_TRANSITION_NEXT (transition),
711 GST_STATE_TRANSITION_CURRENT (transition)));
712 }
713 return GST_STATE_CHANGE_FAILURE;
714 }
715
716 /* the parent's (GstElement) state change func won't return ASYNC or
717 * NO_PREROLL, so unless it has returned FAILURE, which we have catched above,
718 * we are not interested in its return code... just return the peer's */
719 return peer_ret;
720 }
721