1 /* GStreamer
2  * Copyright (C) 2015-2017 YouView TV Ltd
3  *   Author: Vincent Penquerch <vincent.penquerch@collabora.co.uk>
4  *
5  * gstipcpipelinecomm.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 #ifdef HAVE_CONFIG_H
24 #  include "config.h"
25 #endif
26 
27 #include <unistd.h>
28 #include <errno.h>
29 #include <string.h>
30 #include <gst/base/gstbytewriter.h>
31 #include <gst/gstprotection.h>
32 #include "gstipcpipelinecomm.h"
33 
34 GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_comm_debug);
35 #define GST_CAT_DEFAULT gst_ipc_pipeline_comm_debug
36 
37 #define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND)
38 
39 GQuark QUARK_ID;
40 
41 typedef enum
42 {
43   ACK_TYPE_NONE,
44   ACK_TYPE_TIMED,
45   ACK_TYPE_BLOCKING
46 } AckType;
47 
48 typedef enum
49 {
50   COMM_REQUEST_TYPE_BUFFER,
51   COMM_REQUEST_TYPE_EVENT,
52   COMM_REQUEST_TYPE_QUERY,
53   COMM_REQUEST_TYPE_STATE_CHANGE,
54   COMM_REQUEST_TYPE_MESSAGE,
55 } CommRequestType;
56 
57 typedef struct
58 {
59   guint32 id;
60   gboolean replied;
61   gboolean comm_error;
62   guint32 ret;
63   GstQuery *query;
64   CommRequestType type;
65   GCond cond;
66 } CommRequest;
67 
68 static const gchar *comm_request_ret_get_name (CommRequestType type,
69     guint32 ret);
70 static guint32 comm_request_ret_get_failure_value (CommRequestType type);
71 
72 static CommRequest *
comm_request_new(guint32 id,CommRequestType type,GstQuery * query)73 comm_request_new (guint32 id, CommRequestType type, GstQuery * query)
74 {
75   CommRequest *req;
76 
77   req = g_malloc (sizeof (CommRequest));
78   req->id = id;
79   g_cond_init (&req->cond);
80   req->replied = FALSE;
81   req->comm_error = FALSE;
82   req->query = query;
83   req->ret = comm_request_ret_get_failure_value (type);
84   req->type = type;
85 
86   return req;
87 }
88 
89 static guint32
comm_request_wait(GstIpcPipelineComm * comm,CommRequest * req,AckType ack_type)90 comm_request_wait (GstIpcPipelineComm * comm, CommRequest * req,
91     AckType ack_type)
92 {
93   guint32 ret = comm_request_ret_get_failure_value (req->type);
94   guint64 end_time;
95 
96   if (ack_type == ACK_TYPE_TIMED)
97     end_time = g_get_monotonic_time () + comm->ack_time;
98   else
99     end_time = G_MAXUINT64;
100 
101   GST_TRACE_OBJECT (comm->element, "Waiting for ACK/NAK for request %u",
102       req->id);
103   while (!req->replied) {
104     if (ack_type == ACK_TYPE_TIMED) {
105       if (!g_cond_wait_until (&req->cond, &comm->mutex, end_time))
106         break;
107     } else
108       g_cond_wait (&req->cond, &comm->mutex);
109   }
110 
111   if (req->replied) {
112     ret = req->ret;
113     GST_TRACE_OBJECT (comm->element, "Got reply for request %u: %d (%s)",
114         req->id, ret, comm_request_ret_get_name (req->type, ret));
115   } else {
116     req->comm_error = TRUE;
117     GST_ERROR_OBJECT (comm->element, "Timeout waiting for reply for request %u",
118         req->id);
119   }
120 
121   return ret;
122 }
123 
124 static void
comm_request_free(CommRequest * req)125 comm_request_free (CommRequest * req)
126 {
127   g_cond_clear (&req->cond);
128   g_free (req);
129 }
130 
131 static const gchar *
comm_request_ret_get_name(CommRequestType type,guint32 ret)132 comm_request_ret_get_name (CommRequestType type, guint32 ret)
133 {
134   switch (type) {
135     case COMM_REQUEST_TYPE_BUFFER:
136       return gst_flow_get_name (ret);
137     case COMM_REQUEST_TYPE_EVENT:
138     case COMM_REQUEST_TYPE_QUERY:
139     case COMM_REQUEST_TYPE_MESSAGE:
140       return ret ? "TRUE" : "FALSE";
141     case COMM_REQUEST_TYPE_STATE_CHANGE:
142       return gst_element_state_change_return_get_name (ret);
143     default:
144       g_assert_not_reached ();
145   }
146 }
147 
148 static guint32
comm_request_ret_get_failure_value(CommRequestType type)149 comm_request_ret_get_failure_value (CommRequestType type)
150 {
151   switch (type) {
152     case COMM_REQUEST_TYPE_BUFFER:
153       return GST_FLOW_COMM_ERROR;
154     case COMM_REQUEST_TYPE_EVENT:
155     case COMM_REQUEST_TYPE_MESSAGE:
156     case COMM_REQUEST_TYPE_QUERY:
157       return FALSE;
158     case COMM_REQUEST_TYPE_STATE_CHANGE:
159       return GST_STATE_CHANGE_FAILURE;
160     default:
161       g_assert_not_reached ();
162   }
163 }
164 
165 static const gchar *
gst_ipc_pipeline_comm_data_type_get_name(GstIpcPipelineCommDataType type)166 gst_ipc_pipeline_comm_data_type_get_name (GstIpcPipelineCommDataType type)
167 {
168   switch (type) {
169     case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
170       return "ACK";
171     case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
172       return "QUERY_RESULT";
173     case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
174       return "BUFFER";
175     case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
176       return "EVENT";
177     case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
178       return "SINK_MESSAGE_EVENT";
179     case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
180       return "QUERY";
181     case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
182       return "STATE_CHANGE";
183     case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
184       return "STATE_LOST";
185     case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
186       return "MESSAGE";
187     case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
188       return "GERROR_MESSAGE";
189     default:
190       return "UNKNOWN";
191   }
192 }
193 
194 static gboolean
gst_ipc_pipeline_comm_sync_fd(GstIpcPipelineComm * comm,guint32 id,GstQuery * query,guint32 * ret,AckType ack_type,CommRequestType type)195 gst_ipc_pipeline_comm_sync_fd (GstIpcPipelineComm * comm, guint32 id,
196     GstQuery * query, guint32 * ret, AckType ack_type, CommRequestType type)
197 {
198   CommRequest *req;
199   gboolean comm_error;
200   GHashTable *waiting_ids;
201 
202   if (ack_type == ACK_TYPE_NONE)
203     return TRUE;
204 
205   req = comm_request_new (id, type, query);
206   waiting_ids = g_hash_table_ref (comm->waiting_ids);
207   g_hash_table_insert (waiting_ids, GINT_TO_POINTER (id), req);
208   *ret = comm_request_wait (comm, req, ack_type);
209   comm_error = req->comm_error;
210   g_hash_table_remove (waiting_ids, GINT_TO_POINTER (id));
211   g_hash_table_unref (waiting_ids);
212   return !comm_error;
213 }
214 
215 static gboolean
write_to_fd_raw(GstIpcPipelineComm * comm,const void * data,size_t size)216 write_to_fd_raw (GstIpcPipelineComm * comm, const void *data, size_t size)
217 {
218   size_t offset;
219   gboolean ret = TRUE;
220 
221   offset = 0;
222   GST_TRACE_OBJECT (comm->element, "Writing %zu bytes to fdout", size);
223   while (size) {
224     ssize_t written =
225         write (comm->fdout, (const unsigned char *) data + offset, size);
226     if (written < 0) {
227       if (errno == EAGAIN || errno == EINTR)
228         continue;
229       GST_ERROR_OBJECT (comm->element, "Failed to write to fd: %s",
230           strerror (errno));
231       ret = FALSE;
232       goto done;
233     }
234     size -= written;
235     offset += written;
236   }
237 
238 done:
239   return ret;
240 }
241 
242 static gboolean
write_byte_writer_to_fd(GstIpcPipelineComm * comm,GstByteWriter * bw)243 write_byte_writer_to_fd (GstIpcPipelineComm * comm, GstByteWriter * bw)
244 {
245   guint8 *data;
246   gboolean ret;
247   guint size;
248 
249   size = gst_byte_writer_get_size (bw);
250   data = gst_byte_writer_reset_and_get_data (bw);
251   if (!data)
252     return FALSE;
253   ret = write_to_fd_raw (comm, data, size);
254   g_free (data);
255   return ret;
256 }
257 
258 static void
gst_ipc_pipeline_comm_write_ack_to_fd(GstIpcPipelineComm * comm,guint32 id,guint32 ret,CommRequestType type)259 gst_ipc_pipeline_comm_write_ack_to_fd (GstIpcPipelineComm * comm, guint32 id,
260     guint32 ret, CommRequestType type)
261 {
262   const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK;
263   guint32 size;
264   GstByteWriter bw;
265 
266   g_mutex_lock (&comm->mutex);
267 
268   GST_TRACE_OBJECT (comm->element, "Writing ACK for %u: %s (%d)", id,
269       comm_request_ret_get_name (type, ret), ret);
270   gst_byte_writer_init (&bw);
271   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
272     goto write_failed;
273   if (!gst_byte_writer_put_uint32_le (&bw, id))
274     goto write_failed;
275   size = sizeof (ret);
276   if (!gst_byte_writer_put_uint32_le (&bw, size))
277     goto write_failed;
278   if (!gst_byte_writer_put_uint32_le (&bw, ret))
279     goto write_failed;
280 
281   if (!write_byte_writer_to_fd (comm, &bw))
282     goto write_failed;
283 
284 done:
285   g_mutex_unlock (&comm->mutex);
286   gst_byte_writer_reset (&bw);
287   return;
288 
289 write_failed:
290   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
291       ("Failed to write to socket"));
292   goto done;
293 }
294 
295 void
gst_ipc_pipeline_comm_write_flow_ack_to_fd(GstIpcPipelineComm * comm,guint32 id,GstFlowReturn ret)296 gst_ipc_pipeline_comm_write_flow_ack_to_fd (GstIpcPipelineComm * comm,
297     guint32 id, GstFlowReturn ret)
298 {
299   gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
300       COMM_REQUEST_TYPE_BUFFER);
301 }
302 
303 void
gst_ipc_pipeline_comm_write_boolean_ack_to_fd(GstIpcPipelineComm * comm,guint32 id,gboolean ret)304 gst_ipc_pipeline_comm_write_boolean_ack_to_fd (GstIpcPipelineComm * comm,
305     guint32 id, gboolean ret)
306 {
307   gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
308       COMM_REQUEST_TYPE_EVENT);
309 }
310 
311 void
gst_ipc_pipeline_comm_write_state_change_ack_to_fd(GstIpcPipelineComm * comm,guint32 id,GstStateChangeReturn ret)312 gst_ipc_pipeline_comm_write_state_change_ack_to_fd (GstIpcPipelineComm * comm,
313     guint32 id, GstStateChangeReturn ret)
314 {
315   gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
316       COMM_REQUEST_TYPE_STATE_CHANGE);
317 }
318 
319 void
gst_ipc_pipeline_comm_write_query_result_to_fd(GstIpcPipelineComm * comm,guint32 id,gboolean result,GstQuery * query)320 gst_ipc_pipeline_comm_write_query_result_to_fd (GstIpcPipelineComm * comm,
321     guint32 id, gboolean result, GstQuery * query)
322 {
323   const unsigned char payload_type =
324       GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT;
325   guint8 result8 = result;
326   guint32 size;
327   size_t len;
328   char *str = NULL;
329   guint32 type;
330   const GstStructure *structure;
331   GstByteWriter bw;
332 
333   g_mutex_lock (&comm->mutex);
334 
335   GST_TRACE_OBJECT (comm->element,
336       "Writing query result for %u: %d, %" GST_PTR_FORMAT, id, result, query);
337   gst_byte_writer_init (&bw);
338   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
339     goto write_failed;
340   if (!gst_byte_writer_put_uint32_le (&bw, id))
341     goto write_failed;
342   structure = gst_query_get_structure (query);
343   if (structure) {
344     str = gst_structure_to_string (structure);
345     len = strlen (str);
346   } else {
347     str = NULL;
348     len = 0;
349   }
350   size = 1 + sizeof (guint32) + len + 1;
351   if (!gst_byte_writer_put_uint32_le (&bw, size))
352     goto write_failed;
353   if (!gst_byte_writer_put_uint8 (&bw, result8))
354     goto write_failed;
355   type = GST_QUERY_TYPE (query);
356   if (!gst_byte_writer_put_uint32_le (&bw, type))
357     goto write_failed;
358   if (str) {
359     if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, len + 1))
360       goto write_failed;
361   } else {
362     if (!gst_byte_writer_put_uint8 (&bw, 0))
363       goto write_failed;
364   }
365 
366   if (!write_byte_writer_to_fd (comm, &bw))
367     goto write_failed;
368 
369 done:
370   g_mutex_unlock (&comm->mutex);
371   gst_byte_writer_reset (&bw);
372   g_free (str);
373   return;
374 
375 write_failed:
376   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
377       ("Failed to write to socket"));
378   goto done;
379 }
380 
381 static gboolean
gst_ipc_pipeline_comm_read_query_result(GstIpcPipelineComm * comm,guint32 size,GstQuery ** query)382 gst_ipc_pipeline_comm_read_query_result (GstIpcPipelineComm * comm,
383     guint32 size, GstQuery ** query)
384 {
385   gchar *end = NULL;
386   GstStructure *structure;
387   guint8 result;
388   guint32 type;
389   const guint8 *payload = NULL;
390   guint32 mapped_size = size;
391 
392   /* this should not be called if we don't have enough yet */
393   *query = NULL;
394   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE);
395   g_return_val_if_fail (size >= 1 + sizeof (guint32), FALSE);
396 
397   payload = gst_adapter_map (comm->adapter, mapped_size);
398   if (!payload)
399     return FALSE;
400   result = *payload++;
401   memcpy (&type, payload, sizeof (type));
402   payload += sizeof (type);
403 
404   size -= 1 + sizeof (guint32);
405   if (size == 0)
406     goto done;
407 
408   if (payload[size - 1]) {
409     result = FALSE;
410     goto done;
411   }
412   if (*payload) {
413     structure = gst_structure_from_string ((const char *) payload, &end);
414   } else {
415     structure = NULL;
416   }
417   if (!structure) {
418     result = FALSE;
419     goto done;
420   }
421 
422   *query = gst_query_new_custom (type, structure);
423 
424 done:
425   gst_adapter_unmap (comm->adapter);
426   gst_adapter_flush (comm->adapter, mapped_size);
427   return result;
428 }
429 
430 typedef struct
431 {
432   guint32 bytes;
433 
434   guint64 size;
435   guint32 flags;
436   guint64 api;
437   char *str;
438 } MetaBuildInfo;
439 
440 typedef struct
441 {
442   GstIpcPipelineComm *comm;
443   guint32 n_meta;
444   guint32 total_bytes;
445   MetaBuildInfo *info;
446 } MetaListRepresentation;
447 
448 static gboolean
build_meta(GstBuffer * buffer,GstMeta ** meta,gpointer user_data)449 build_meta (GstBuffer * buffer, GstMeta ** meta, gpointer user_data)
450 {
451   MetaListRepresentation *repr = user_data;
452 
453   repr->n_meta++;
454   repr->info = g_realloc (repr->info, repr->n_meta * sizeof (MetaBuildInfo));
455   repr->info[repr->n_meta - 1].bytes =
456       /* 4 byte bytes */
457       4
458       /* 4 byte GstMetaFlags */
459       + 4
460       /* GstMetaInfo::api */
461       + 4 + strlen (g_type_name ((*meta)->info->api)) + 1
462       /* GstMetaInfo::size */
463       + 8
464       /* str length */
465       + 4;
466 
467   repr->info[repr->n_meta - 1].flags = (*meta)->flags;
468   repr->info[repr->n_meta - 1].api = (*meta)->info->api;
469   repr->info[repr->n_meta - 1].size = (*meta)->info->size;
470   repr->info[repr->n_meta - 1].str = NULL;
471 
472   /* GstMeta is a base class, and actual useful classes are all different...
473      So we list a few of them we know we want and ignore the open ended rest */
474   if ((*meta)->info->api == GST_PROTECTION_META_API_TYPE) {
475     GstProtectionMeta *m = (GstProtectionMeta *) * meta;
476     repr->info[repr->n_meta - 1].str = gst_structure_to_string (m->info);
477     repr->info[repr->n_meta - 1].bytes +=
478         strlen (repr->info[repr->n_meta - 1].str) + 1;
479     GST_TRACE_OBJECT (repr->comm->element, "Found GstMeta type %s: %s",
480         g_type_name ((*meta)->info->api), repr->info[repr->n_meta - 1].str);
481   } else {
482     GST_WARNING_OBJECT (repr->comm->element, "Ignoring GstMeta type %s",
483         g_type_name ((*meta)->info->api));
484   }
485   repr->total_bytes += repr->info[repr->n_meta - 1].bytes;
486   return TRUE;
487 }
488 
489 typedef struct
490 {
491   guint64 pts;
492   guint64 dts;
493   guint64 duration;
494   guint64 offset;
495   guint64 offset_end;
496   guint64 flags;
497 } CommBufferMetadata;
498 
499 GstFlowReturn
gst_ipc_pipeline_comm_write_buffer_to_fd(GstIpcPipelineComm * comm,GstBuffer * buffer)500 gst_ipc_pipeline_comm_write_buffer_to_fd (GstIpcPipelineComm * comm,
501     GstBuffer * buffer)
502 {
503   const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER;
504   GstMapInfo map;
505   guint32 ret32 = GST_FLOW_OK;
506   guint32 size, n;
507   CommBufferMetadata meta;
508   GstFlowReturn ret;
509   MetaListRepresentation repr = { comm, 0, 4, NULL };   /* starts a 4 for n_meta */
510   GstByteWriter bw;
511 
512   g_mutex_lock (&comm->mutex);
513   ++comm->send_id;
514 
515   GST_TRACE_OBJECT (comm->element, "Writing buffer %u: %" GST_PTR_FORMAT,
516       comm->send_id, buffer);
517 
518   gst_byte_writer_init (&bw);
519 
520   meta.pts = GST_BUFFER_PTS (buffer);
521   meta.dts = GST_BUFFER_DTS (buffer);
522   meta.duration = GST_BUFFER_DURATION (buffer);
523   meta.offset = GST_BUFFER_OFFSET (buffer);
524   meta.offset_end = GST_BUFFER_OFFSET_END (buffer);
525   meta.flags = GST_BUFFER_FLAGS (buffer);
526 
527   /* work out meta size */
528   gst_buffer_foreach_meta (buffer, build_meta, &repr);
529 
530   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
531     goto write_failed;
532   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
533     goto write_failed;
534   size =
535       gst_buffer_get_size (buffer) + sizeof (guint32) +
536       sizeof (CommBufferMetadata) + repr.total_bytes;
537   if (!gst_byte_writer_put_uint32_le (&bw, size))
538     goto write_failed;
539   if (!gst_byte_writer_put_data (&bw, (const guint8 *) &meta, sizeof (meta)))
540     goto write_failed;
541   size = gst_buffer_get_size (buffer);
542   if (!gst_byte_writer_put_uint32_le (&bw, size))
543     goto write_failed;
544   if (!write_byte_writer_to_fd (comm, &bw))
545     goto write_failed;
546 
547   if (!gst_buffer_map (buffer, &map, GST_MAP_READ))
548     goto map_failed;
549   ret = write_to_fd_raw (comm, map.data, map.size);
550   gst_buffer_unmap (buffer, &map);
551   if (!ret)
552     goto write_failed;
553 
554   /* meta */
555   gst_byte_writer_init (&bw);
556   if (!gst_byte_writer_put_uint32_le (&bw, repr.n_meta))
557     goto write_failed;
558   for (n = 0; n < repr.n_meta; ++n) {
559     const MetaBuildInfo *info = repr.info + n;
560     guint32 len;
561     const char *s;
562 
563     if (!gst_byte_writer_put_uint32_le (&bw, info->bytes))
564       goto write_failed;
565 
566     if (!gst_byte_writer_put_uint32_le (&bw, info->flags))
567       goto write_failed;
568 
569     s = g_type_name (info->api);
570     len = strlen (s) + 1;
571     if (!gst_byte_writer_put_uint32_le (&bw, len))
572       goto write_failed;
573     if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len))
574       goto write_failed;
575 
576     if (!gst_byte_writer_put_uint64_le (&bw, info->size))
577       goto write_failed;
578 
579     s = info->str;
580     len = s ? (strlen (s) + 1) : 0;
581     if (!gst_byte_writer_put_uint32_le (&bw, len))
582       goto write_failed;
583     if (len)
584       if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len))
585         goto write_failed;
586   }
587 
588   if (!write_byte_writer_to_fd (comm, &bw))
589     goto write_failed;
590 
591   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
592           ACK_TYPE_BLOCKING, COMM_REQUEST_TYPE_BUFFER))
593     goto wait_failed;
594   ret = ret32;
595 
596 done:
597   g_mutex_unlock (&comm->mutex);
598   gst_byte_writer_reset (&bw);
599   for (n = 0; n < repr.n_meta; ++n)
600     g_free (repr.info[n].str);
601   g_free (repr.info);
602   return ret;
603 
604 write_failed:
605   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
606       ("Failed to write to socket"));
607   ret = GST_FLOW_COMM_ERROR;
608   goto done;
609 
610 wait_failed:
611   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
612       ("Failed to wait for reply on socket"));
613   ret = GST_FLOW_COMM_ERROR;
614   goto done;
615 
616 map_failed:
617   GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL),
618       ("Failed to map buffer"));
619   ret = GST_FLOW_ERROR;
620   goto done;
621 }
622 
623 static GstBuffer *
gst_ipc_pipeline_comm_read_buffer(GstIpcPipelineComm * comm,guint32 size)624 gst_ipc_pipeline_comm_read_buffer (GstIpcPipelineComm * comm, guint32 size)
625 {
626   GstBuffer *buffer;
627   CommBufferMetadata meta;
628   guint32 n_meta, n;
629   const guint8 *payload = NULL;
630   guint32 mapped_size, buffer_data_size;
631 
632   /* this should not be called if we don't have enough yet */
633   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
634   g_return_val_if_fail (size >= sizeof (CommBufferMetadata), NULL);
635 
636   mapped_size = sizeof (CommBufferMetadata) + sizeof (buffer_data_size);
637   payload = gst_adapter_map (comm->adapter, mapped_size);
638   if (!payload)
639     return NULL;
640   memcpy (&meta, payload, sizeof (CommBufferMetadata));
641   payload += sizeof (CommBufferMetadata);
642   memcpy (&buffer_data_size, payload, sizeof (buffer_data_size));
643   size -= mapped_size;
644   gst_adapter_unmap (comm->adapter);
645   gst_adapter_flush (comm->adapter, mapped_size);
646 
647   if (buffer_data_size == 0) {
648     buffer = gst_buffer_new ();
649   } else {
650     buffer = gst_adapter_get_buffer (comm->adapter, buffer_data_size);
651     gst_adapter_flush (comm->adapter, buffer_data_size);
652   }
653   size -= buffer_data_size;
654 
655   GST_BUFFER_PTS (buffer) = meta.pts;
656   GST_BUFFER_DTS (buffer) = meta.dts;
657   GST_BUFFER_DURATION (buffer) = meta.duration;
658   GST_BUFFER_OFFSET (buffer) = meta.offset;
659   GST_BUFFER_OFFSET_END (buffer) = meta.offset_end;
660   GST_BUFFER_FLAGS (buffer) = meta.flags;
661 
662   /* If you don't call that, the GType isn't yet known at the
663      g_type_from_name below */
664   gst_protection_meta_get_info ();
665 
666   mapped_size = size;
667   payload = gst_adapter_map (comm->adapter, mapped_size);
668   if (!payload) {
669     gst_buffer_unref (buffer);
670     return NULL;
671   }
672   memcpy (&n_meta, payload, sizeof (n_meta));
673   payload += sizeof (n_meta);
674 
675   for (n = 0; n < n_meta; ++n) {
676     guint32 flags, len, bytes;
677     guint64 msize;
678     GType api;
679     GstMeta *meta;
680     GstStructure *structure = NULL;
681 
682     memcpy (&bytes, payload, sizeof (bytes));
683     payload += sizeof (bytes);
684 
685 #define READ_FIELD(f) do { \
686     memcpy (&f, payload, sizeof (f)); \
687     payload += sizeof(f); \
688     } while(0)
689 
690     READ_FIELD (flags);
691     READ_FIELD (len);
692     api = g_type_from_name ((const char *) payload);
693     payload = (const guint8 *) strchr ((const char *) payload, 0) + 1;
694     READ_FIELD (msize);
695     READ_FIELD (len);
696     if (len) {
697       structure = gst_structure_new_from_string ((const char *) payload);
698       payload += len + 1;
699     }
700 
701     /* Seems we can add a meta from the api nor type ? */
702     if (api == GST_PROTECTION_META_API_TYPE) {
703       meta =
704           gst_buffer_add_meta (buffer, gst_protection_meta_get_info (), NULL);
705       ((GstProtectionMeta *) meta)->info = structure;
706     } else {
707       GST_WARNING_OBJECT (comm->element, "Unsupported meta: %s",
708           g_type_name (api));
709       if (structure)
710         gst_structure_free (structure);
711     }
712 
713 #undef READ_FIELD
714 
715   }
716 
717   gst_adapter_unmap (comm->adapter);
718   gst_adapter_flush (comm->adapter, mapped_size);
719 
720   return buffer;
721 }
722 
723 static gboolean
gst_ipc_pipeline_comm_write_sink_message_event_to_fd(GstIpcPipelineComm * comm,GstEvent * event)724 gst_ipc_pipeline_comm_write_sink_message_event_to_fd (GstIpcPipelineComm * comm,
725     GstEvent * event)
726 {
727   const unsigned char payload_type =
728       GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT;
729   gboolean ret;
730   guint32 type, size, eseqnum, mseqnum, ret32 = TRUE, slen, structure_slen;
731   char *str = NULL;
732   const GstStructure *structure;
733   GstMessage *message = NULL;
734   const char *name;
735   GstByteWriter bw;
736 
737   g_return_val_if_fail (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE,
738       FALSE);
739 
740   g_mutex_lock (&comm->mutex);
741   ++comm->send_id;
742 
743   GST_TRACE_OBJECT (comm->element,
744       "Writing sink message event %u: %" GST_PTR_FORMAT, comm->send_id, event);
745 
746   gst_byte_writer_init (&bw);
747   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
748     goto write_failed;
749   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
750     goto write_failed;
751   name = gst_structure_get_name (gst_event_get_structure (event));
752   slen = strlen (name) + 1;
753   gst_event_parse_sink_message (event, &message);
754   structure = gst_message_get_structure (message);
755   if (structure) {
756     str = gst_structure_to_string (structure);
757     structure_slen = strlen (str);
758   } else {
759     str = NULL;
760     structure_slen = 0;
761   }
762   size = sizeof (type) + sizeof (eseqnum) + sizeof (mseqnum) + sizeof (slen) +
763       strlen (name) + 1 + structure_slen + 1;
764   if (!gst_byte_writer_put_uint32_le (&bw, size))
765     goto write_failed;
766 
767   type = GST_MESSAGE_TYPE (message);
768   if (!gst_byte_writer_put_uint32_le (&bw, type))
769     goto write_failed;
770   size -= sizeof (type);
771 
772   eseqnum = GST_EVENT_SEQNUM (event);
773   if (!gst_byte_writer_put_uint32_le (&bw, eseqnum))
774     goto write_failed;
775   size -= sizeof (eseqnum);
776 
777   mseqnum = GST_MESSAGE_SEQNUM (message);
778   if (!gst_byte_writer_put_uint32_le (&bw, mseqnum))
779     goto write_failed;
780   size -= sizeof (mseqnum);
781 
782   if (!gst_byte_writer_put_uint32_le (&bw, slen))
783     goto write_failed;
784   size -= sizeof (slen);
785 
786   if (!gst_byte_writer_put_data (&bw, (const guint8 *) name, slen))
787     goto write_failed;
788   size -= slen;
789 
790   if (str) {
791     if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size))
792       goto write_failed;
793   } else {
794     if (!gst_byte_writer_put_uint8 (&bw, 0))
795       goto write_failed;
796   }
797 
798   if (!write_byte_writer_to_fd (comm, &bw))
799     goto write_failed;
800 
801   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
802           GST_EVENT_IS_SERIALIZED (event) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED,
803           COMM_REQUEST_TYPE_EVENT))
804     goto write_failed;
805 
806   ret = ret32;
807 
808 done:
809   g_mutex_unlock (&comm->mutex);
810   gst_byte_writer_reset (&bw);
811   g_free (str);
812   if (message)
813     gst_message_unref (message);
814   return ret;
815 
816 write_failed:
817   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
818       ("Failed to write to socket"));
819   ret = FALSE;
820   goto done;
821 }
822 
823 static GstEvent *
gst_ipc_pipeline_comm_read_sink_message_event(GstIpcPipelineComm * comm,guint32 size)824 gst_ipc_pipeline_comm_read_sink_message_event (GstIpcPipelineComm * comm,
825     guint32 size)
826 {
827   GstMessage *message;
828   GstEvent *event = NULL;
829   gchar *end = NULL;
830   GstStructure *structure;
831   guint32 type, eseqnum, mseqnum, slen;
832   const char *name;
833   guint32 mapped_size = size;
834   const guint8 *payload;
835 
836   /* this should not be called if we don't have enough yet */
837   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
838   g_return_val_if_fail (size >= sizeof (type) + sizeof (slen), NULL);
839 
840   payload = gst_adapter_map (comm->adapter, mapped_size);
841   if (!payload)
842     return NULL;
843   memcpy (&type, payload, sizeof (type));
844   payload += sizeof (type);
845   size -= sizeof (type);
846   if (size == 0)
847     goto done;
848 
849   memcpy (&eseqnum, payload, sizeof (eseqnum));
850   payload += sizeof (eseqnum);
851   size -= sizeof (eseqnum);
852   if (size == 0)
853     goto done;
854 
855   memcpy (&mseqnum, payload, sizeof (mseqnum));
856   payload += sizeof (mseqnum);
857   size -= sizeof (mseqnum);
858   if (size == 0)
859     goto done;
860 
861   memcpy (&slen, payload, sizeof (slen));
862   payload += sizeof (slen);
863   size -= sizeof (slen);
864   if (size == 0)
865     goto done;
866 
867   if (payload[slen - 1])
868     goto done;
869   name = (const char *) payload;
870   payload += slen;
871   size -= slen;
872 
873   if ((payload)[size - 1]) {
874     goto done;
875   }
876   if (*payload) {
877     structure = gst_structure_from_string ((const char *) payload, &end);
878   } else {
879     structure = NULL;
880   }
881 
882   message =
883       gst_message_new_custom (type, GST_OBJECT (comm->element), structure);
884   gst_message_set_seqnum (message, mseqnum);
885   event = gst_event_new_sink_message (name, message);
886   gst_event_set_seqnum (event, eseqnum);
887   gst_message_unref (message);
888 
889 done:
890   gst_adapter_unmap (comm->adapter);
891   gst_adapter_flush (comm->adapter, mapped_size);
892   return event;
893 }
894 
895 gboolean
gst_ipc_pipeline_comm_write_event_to_fd(GstIpcPipelineComm * comm,gboolean upstream,GstEvent * event)896 gst_ipc_pipeline_comm_write_event_to_fd (GstIpcPipelineComm * comm,
897     gboolean upstream, GstEvent * event)
898 {
899   const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT;
900   gboolean ret;
901   guint32 type, size, ret32 = TRUE, seqnum, slen;
902   char *str = NULL;
903   const GstStructure *structure;
904   GstByteWriter bw;
905 
906   /* we special case sink-message event as gst can't serialize/de-serialize it */
907   if (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE)
908     return gst_ipc_pipeline_comm_write_sink_message_event_to_fd (comm, event);
909 
910   g_mutex_lock (&comm->mutex);
911   ++comm->send_id;
912 
913   GST_TRACE_OBJECT (comm->element, "Writing event %u: %" GST_PTR_FORMAT,
914       comm->send_id, event);
915 
916   gst_byte_writer_init (&bw);
917   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
918     goto write_failed;
919   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
920     goto write_failed;
921   structure = gst_event_get_structure (event);
922   if (structure) {
923 
924     if (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START) {
925       GstStructure *s = gst_structure_copy (structure);
926       gst_structure_remove_field (s, "stream");
927       str = gst_structure_to_string (s);
928       gst_structure_free (s);
929     } else {
930       str = gst_structure_to_string (structure);
931     }
932 
933     slen = strlen (str);
934   } else {
935     str = NULL;
936     slen = 0;
937   }
938   size = sizeof (type) + sizeof (seqnum) + 1 + slen + 1;
939   if (!gst_byte_writer_put_uint32_le (&bw, size))
940     goto write_failed;
941 
942   type = GST_EVENT_TYPE (event);
943   if (!gst_byte_writer_put_uint32_le (&bw, type))
944     goto write_failed;
945 
946   seqnum = GST_EVENT_SEQNUM (event);
947   if (!gst_byte_writer_put_uint32_le (&bw, seqnum))
948     goto write_failed;
949 
950   if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0))
951     goto write_failed;
952 
953   if (str) {
954     if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1))
955       goto write_failed;
956   } else {
957     if (!gst_byte_writer_put_uint8 (&bw, 0))
958       goto write_failed;
959   }
960 
961   if (!write_byte_writer_to_fd (comm, &bw))
962     goto write_failed;
963 
964   /* Upstream events get serialized, this is required to send seeks only
965    * one at a time. */
966   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
967           (GST_EVENT_IS_SERIALIZED (event) || GST_EVENT_IS_UPSTREAM (event)) ?
968           ACK_TYPE_BLOCKING : ACK_TYPE_NONE, COMM_REQUEST_TYPE_EVENT))
969     goto write_failed;
970   ret = ret32;
971 
972 done:
973   g_mutex_unlock (&comm->mutex);
974   g_free (str);
975   gst_byte_writer_reset (&bw);
976   return ret;
977 
978 write_failed:
979   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
980       ("Failed to write to socket"));
981   ret = FALSE;
982   goto done;
983 }
984 
985 static GstEvent *
gst_ipc_pipeline_comm_read_event(GstIpcPipelineComm * comm,guint32 size,gboolean * upstream)986 gst_ipc_pipeline_comm_read_event (GstIpcPipelineComm * comm, guint32 size,
987     gboolean * upstream)
988 {
989   GstEvent *event = NULL;
990   gchar *end = NULL;
991   GstStructure *structure;
992   guint32 type, seqnum;
993   guint32 mapped_size = size;
994   const guint8 *payload;
995 
996   /* this should not be called if we don't have enough yet */
997   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
998   g_return_val_if_fail (size >= sizeof (type), NULL);
999 
1000   payload = gst_adapter_map (comm->adapter, mapped_size);
1001   if (!payload)
1002     return NULL;
1003 
1004   memcpy (&type, payload, sizeof (type));
1005   payload += sizeof (type);
1006   size -= sizeof (type);
1007   if (size == 0)
1008     goto done;
1009 
1010   memcpy (&seqnum, payload, sizeof (seqnum));
1011   payload += sizeof (seqnum);
1012   size -= sizeof (seqnum);
1013   if (size == 0)
1014     goto done;
1015 
1016   *upstream = (*payload) ? TRUE : FALSE;
1017   payload += 1;
1018   size -= 1;
1019   if (size == 0)
1020     goto done;
1021 
1022   if (payload[size - 1])
1023     goto done;
1024   if (*payload) {
1025     structure = gst_structure_from_string ((const char *) payload, &end);
1026   } else {
1027     structure = NULL;
1028   }
1029 
1030   event = gst_event_new_custom (type, structure);
1031   gst_event_set_seqnum (event, seqnum);
1032 
1033 done:
1034   gst_adapter_unmap (comm->adapter);
1035   gst_adapter_flush (comm->adapter, mapped_size);
1036   return event;
1037 }
1038 
1039 gboolean
gst_ipc_pipeline_comm_write_query_to_fd(GstIpcPipelineComm * comm,gboolean upstream,GstQuery * query)1040 gst_ipc_pipeline_comm_write_query_to_fd (GstIpcPipelineComm * comm,
1041     gboolean upstream, GstQuery * query)
1042 {
1043   const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY;
1044   gboolean ret;
1045   guint32 type, size, ret32 = TRUE, slen;
1046   char *str = NULL;
1047   const GstStructure *structure;
1048   GstByteWriter bw;
1049 
1050   g_mutex_lock (&comm->mutex);
1051   ++comm->send_id;
1052 
1053   GST_TRACE_OBJECT (comm->element, "Writing query %u: %" GST_PTR_FORMAT,
1054       comm->send_id, query);
1055 
1056   gst_byte_writer_init (&bw);
1057   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1058     goto write_failed;
1059   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1060     goto write_failed;
1061   structure = gst_query_get_structure (query);
1062   if (structure) {
1063     str = gst_structure_to_string (structure);
1064     slen = strlen (str);
1065   } else {
1066     str = NULL;
1067     slen = 0;
1068   }
1069   size = sizeof (type) + 1 + slen + 1;
1070   if (!gst_byte_writer_put_uint32_le (&bw, size))
1071     goto write_failed;
1072 
1073   type = GST_QUERY_TYPE (query);
1074   if (!gst_byte_writer_put_uint32_le (&bw, type))
1075     goto write_failed;
1076 
1077   if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0))
1078     goto write_failed;
1079 
1080   if (str) {
1081     if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1))
1082       goto write_failed;
1083   } else {
1084     if (!gst_byte_writer_put_uint8 (&bw, 0))
1085       goto write_failed;
1086   }
1087 
1088   if (!write_byte_writer_to_fd (comm, &bw))
1089     goto write_failed;
1090 
1091   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, query, &ret32,
1092           GST_QUERY_IS_SERIALIZED (query) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED,
1093           COMM_REQUEST_TYPE_QUERY))
1094     goto write_failed;
1095 
1096   ret = ret32;
1097 
1098 done:
1099   g_mutex_unlock (&comm->mutex);
1100   g_free (str);
1101   gst_byte_writer_reset (&bw);
1102   return ret;
1103 
1104 write_failed:
1105   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1106       ("Failed to write to socket"));
1107   ret = FALSE;
1108   goto done;
1109 }
1110 
1111 static GstQuery *
gst_ipc_pipeline_comm_read_query(GstIpcPipelineComm * comm,guint32 size,gboolean * upstream)1112 gst_ipc_pipeline_comm_read_query (GstIpcPipelineComm * comm, guint32 size,
1113     gboolean * upstream)
1114 {
1115   GstQuery *query = NULL;
1116   gchar *end = NULL;
1117   GstStructure *structure;
1118   guint32 type;
1119   guint32 mapped_size = size;
1120   const guint8 *payload;
1121 
1122   /* this should not be called if we don't have enough yet */
1123   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
1124   g_return_val_if_fail (size >= sizeof (type), NULL);
1125 
1126   payload = gst_adapter_map (comm->adapter, mapped_size);
1127   if (!payload)
1128     return NULL;
1129 
1130   memcpy (&type, payload, sizeof (type));
1131   payload += sizeof (type);
1132   size -= sizeof (type);
1133   if (size == 0)
1134     goto done;
1135 
1136   *upstream = (*payload) ? TRUE : FALSE;
1137   payload += 1;
1138   size -= 1;
1139   if (size == 0)
1140     goto done;
1141 
1142   if (payload[size - 1])
1143     goto done;
1144   if (*payload) {
1145     structure = gst_structure_from_string ((const char *) payload, &end);
1146   } else {
1147     structure = NULL;
1148   }
1149 
1150   query = gst_query_new_custom (type, structure);
1151 
1152   /* CAPS queries contain a filter field, of GstCaps type, which can be NULL.
1153      This does not play well with the serialization/deserialization system,
1154      which will give us a non-NULL GstCaps which has a value of NULL. This
1155      in turn wreaks havoc with any code that tests whether filter is NULL
1156      (which basically means, am I being given an optional GstCaps ?).
1157      So we look for non-NULL GstCaps which have NULL contents, and replace
1158      them with NULL instead. */
1159   if (GST_QUERY_TYPE (query) == GST_QUERY_CAPS) {
1160     GstCaps *filter;
1161     gst_query_parse_caps (query, &filter);
1162     if (filter
1163         && !strcmp (gst_structure_get_name (gst_caps_get_structure (filter, 0)),
1164             "NULL")) {
1165       gst_query_unref (query);
1166       query = gst_query_new_caps (NULL);
1167     }
1168   }
1169 
1170 done:
1171   gst_adapter_unmap (comm->adapter);
1172   gst_adapter_flush (comm->adapter, mapped_size);
1173   return query;
1174 }
1175 
1176 GstStateChangeReturn
gst_ipc_pipeline_comm_write_state_change_to_fd(GstIpcPipelineComm * comm,GstStateChange transition)1177 gst_ipc_pipeline_comm_write_state_change_to_fd (GstIpcPipelineComm * comm,
1178     GstStateChange transition)
1179 {
1180   const unsigned char payload_type =
1181       GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE;
1182   GstStateChangeReturn ret;
1183   guint32 size, ret32 = GST_STATE_CHANGE_SUCCESS;
1184   GstByteWriter bw;
1185 
1186   g_mutex_lock (&comm->mutex);
1187   ++comm->send_id;
1188 
1189   GST_TRACE_OBJECT (comm->element, "Writing state change %u: %s -> %s",
1190       comm->send_id,
1191       gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
1192       gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
1193 
1194   gst_byte_writer_init (&bw);
1195   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1196     goto write_failed;
1197   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1198     goto write_failed;
1199   size = sizeof (transition);
1200   if (!gst_byte_writer_put_uint32_le (&bw, size))
1201     goto write_failed;
1202   if (!gst_byte_writer_put_uint32_le (&bw, transition))
1203     goto write_failed;
1204 
1205   if (!write_byte_writer_to_fd (comm, &bw))
1206     goto write_failed;
1207 
1208   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
1209           ACK_TYPE_TIMED, COMM_REQUEST_TYPE_STATE_CHANGE))
1210     goto write_failed;
1211   ret = ret32;
1212 
1213 done:
1214   g_mutex_unlock (&comm->mutex);
1215   gst_byte_writer_reset (&bw);
1216   return ret;
1217 
1218 write_failed:
1219   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1220       ("Failed to write to socket"));
1221   ret = GST_STATE_CHANGE_FAILURE;
1222   goto done;
1223 }
1224 
1225 static gboolean
is_valid_state_change(GstStateChange transition)1226 is_valid_state_change (GstStateChange transition)
1227 {
1228   if (transition == GST_STATE_CHANGE_NULL_TO_READY)
1229     return TRUE;
1230   if (transition == GST_STATE_CHANGE_READY_TO_PAUSED)
1231     return TRUE;
1232   if (transition == GST_STATE_CHANGE_PAUSED_TO_PLAYING)
1233     return TRUE;
1234   if (transition == GST_STATE_CHANGE_PLAYING_TO_PAUSED)
1235     return TRUE;
1236   if (transition == GST_STATE_CHANGE_PAUSED_TO_READY)
1237     return TRUE;
1238   if (transition == GST_STATE_CHANGE_READY_TO_NULL)
1239     return TRUE;
1240   if (GST_STATE_TRANSITION_CURRENT (transition) ==
1241       GST_STATE_TRANSITION_NEXT (transition))
1242     return TRUE;
1243   return FALSE;
1244 }
1245 
1246 static gboolean
gst_ipc_pipeline_comm_read_state_change(GstIpcPipelineComm * comm,guint32 size,guint32 * transition)1247 gst_ipc_pipeline_comm_read_state_change (GstIpcPipelineComm * comm,
1248     guint32 size, guint32 * transition)
1249 {
1250   guint32 mapped_size = size;
1251   const guint8 *payload;
1252 
1253   /* this should not be called if we don't have enough yet */
1254   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE);
1255   g_return_val_if_fail (size >= sizeof (*transition), FALSE);
1256 
1257   payload = gst_adapter_map (comm->adapter, size);
1258   if (!payload)
1259     return FALSE;
1260   memcpy (transition, payload, sizeof (*transition));
1261   gst_adapter_unmap (comm->adapter);
1262   gst_adapter_flush (comm->adapter, mapped_size);
1263   return is_valid_state_change (*transition);
1264 }
1265 
1266 void
gst_ipc_pipeline_comm_write_state_lost_to_fd(GstIpcPipelineComm * comm)1267 gst_ipc_pipeline_comm_write_state_lost_to_fd (GstIpcPipelineComm * comm)
1268 {
1269   const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST;
1270   guint32 size;
1271   GstByteWriter bw;
1272 
1273   g_mutex_lock (&comm->mutex);
1274   ++comm->send_id;
1275 
1276   GST_TRACE_OBJECT (comm->element, "Writing state-lost %u", comm->send_id);
1277   gst_byte_writer_init (&bw);
1278   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1279     goto write_failed;
1280   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1281     goto write_failed;
1282   size = 0;
1283   if (!gst_byte_writer_put_uint32_le (&bw, size))
1284     goto write_failed;
1285 
1286   if (!write_byte_writer_to_fd (comm, &bw))
1287     goto write_failed;
1288 
1289 done:
1290   g_mutex_unlock (&comm->mutex);
1291   gst_byte_writer_reset (&bw);
1292   return;
1293 
1294 write_failed:
1295   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1296       ("Failed to write to socket"));
1297   goto done;
1298 }
1299 
1300 static gboolean
gst_ipc_pipeline_comm_read_state_lost(GstIpcPipelineComm * comm,guint32 size)1301 gst_ipc_pipeline_comm_read_state_lost (GstIpcPipelineComm * comm, guint32 size)
1302 {
1303   /* no payload */
1304   return TRUE;
1305 }
1306 
1307 static gboolean
gst_ipc_pipeline_comm_write_gerror_message_to_fd(GstIpcPipelineComm * comm,GstMessage * message)1308 gst_ipc_pipeline_comm_write_gerror_message_to_fd (GstIpcPipelineComm * comm,
1309     GstMessage * message)
1310 {
1311   const unsigned char payload_type =
1312       GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE;
1313   gboolean ret;
1314   guint32 code, size, ret32 = TRUE;
1315   char *str = NULL;
1316   GError *error;
1317   char *extra_message;
1318   const char *domain_string;
1319   unsigned char msgtype;
1320   GstByteWriter bw;
1321 
1322   g_mutex_lock (&comm->mutex);
1323   ++comm->send_id;
1324 
1325   if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR) {
1326     gst_message_parse_error (message, &error, &extra_message);
1327     msgtype = 2;
1328   } else if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING) {
1329     gst_message_parse_warning (message, &error, &extra_message);
1330     msgtype = 1;
1331   } else {
1332     gst_message_parse_info (message, &error, &extra_message);
1333     msgtype = 0;
1334   }
1335   code = error->code;
1336   domain_string = g_quark_to_string (error->domain);
1337   GST_TRACE_OBJECT (comm->element,
1338       "Writing error %u: domain %s, code %u, message %s, extra message %s",
1339       comm->send_id, domain_string, error->code, error->message, extra_message);
1340 
1341   gst_byte_writer_init (&bw);
1342   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1343     goto write_failed;
1344   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1345     goto write_failed;
1346 
1347   size = sizeof (size);
1348   size += 1;
1349   size += strlen (domain_string) + 1;
1350   size += sizeof (code);
1351   size += sizeof (size);
1352   size += error->message ? strlen (error->message) + 1 : 0;
1353   size += sizeof (size);
1354   size += extra_message ? strlen (extra_message) + 1 : 0;
1355 
1356   if (!gst_byte_writer_put_uint32_le (&bw, size))
1357     goto write_failed;
1358 
1359   if (!gst_byte_writer_put_uint8 (&bw, msgtype))
1360     goto write_failed;
1361   size = strlen (domain_string) + 1;
1362   if (!gst_byte_writer_put_uint32_le (&bw, size))
1363     goto write_failed;
1364   if (!gst_byte_writer_put_data (&bw, (const guint8 *) domain_string, size))
1365     goto write_failed;
1366   if (!gst_byte_writer_put_uint32_le (&bw, code))
1367     goto write_failed;
1368   size = error->message ? strlen (error->message) + 1 : 0;
1369   if (!gst_byte_writer_put_uint32_le (&bw, size))
1370     goto write_failed;
1371   if (error->message) {
1372     if (!gst_byte_writer_put_data (&bw, (const guint8 *) error->message, size))
1373       goto write_failed;
1374   }
1375   size = extra_message ? strlen (extra_message) + 1 : 0;
1376   if (!gst_byte_writer_put_uint32_le (&bw, size))
1377     goto write_failed;
1378   if (extra_message) {
1379     if (!gst_byte_writer_put_data (&bw, (const guint8 *) extra_message, size))
1380       goto write_failed;
1381   }
1382 
1383   if (!write_byte_writer_to_fd (comm, &bw))
1384     goto write_failed;
1385 
1386   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
1387           ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE))
1388     goto write_failed;
1389 
1390   ret = ret32;
1391 
1392 done:
1393   g_mutex_unlock (&comm->mutex);
1394   g_free (str);
1395   g_error_free (error);
1396   g_free (extra_message);
1397   gst_byte_writer_reset (&bw);
1398   return ret;
1399 
1400 write_failed:
1401   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1402       ("Failed to write to socket"));
1403   ret = FALSE;
1404   goto done;
1405 }
1406 
1407 static GstMessage *
gst_ipc_pipeline_comm_read_gerror_message(GstIpcPipelineComm * comm,guint32 size)1408 gst_ipc_pipeline_comm_read_gerror_message (GstIpcPipelineComm * comm,
1409     guint32 size)
1410 {
1411   GstMessage *message = NULL;
1412   guint32 code;
1413   GQuark domain;
1414   const char *msg, *extra_message;
1415   GError *error;
1416   unsigned char msgtype;
1417   guint32 mapped_size = size;
1418   const guint8 *payload;
1419 
1420   /* this should not be called if we don't have enough yet */
1421   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
1422   g_return_val_if_fail (size >= sizeof (code) + sizeof (size) * 3 + 1 + 1,
1423       NULL);
1424 
1425   payload = gst_adapter_map (comm->adapter, mapped_size);
1426   if (!payload)
1427     return NULL;
1428   msgtype = *payload++;
1429   memcpy (&size, payload, sizeof (size));
1430   payload += sizeof (size);
1431   if (payload[size - 1])
1432     goto done;
1433   domain = g_quark_from_string ((const char *) payload);
1434   payload += size;
1435 
1436   memcpy (&code, payload, sizeof (code));
1437   payload += sizeof (code);
1438 
1439   memcpy (&size, payload, sizeof (size));
1440   payload += sizeof (size);
1441   if (size) {
1442     if (payload[size - 1])
1443       goto done;
1444     msg = (const char *) payload;
1445   } else {
1446     msg = NULL;
1447   }
1448   payload += size;
1449 
1450   memcpy (&size, payload, sizeof (size));
1451   payload += sizeof (size);
1452   if (size) {
1453     if (payload[size - 1])
1454       goto done;
1455     extra_message = (const char *) payload;
1456   } else {
1457     extra_message = NULL;
1458   }
1459   payload += size;
1460 
1461   error = g_error_new (domain, code, "%s", msg);
1462   if (msgtype == 2)
1463     message =
1464         gst_message_new_error (GST_OBJECT (comm->element), error,
1465         extra_message);
1466   else if (msgtype == 1)
1467     message =
1468         gst_message_new_warning (GST_OBJECT (comm->element), error,
1469         extra_message);
1470   else
1471     message =
1472         gst_message_new_info (GST_OBJECT (comm->element), error, extra_message);
1473   g_error_free (error);
1474 
1475 done:
1476   gst_adapter_unmap (comm->adapter);
1477   gst_adapter_flush (comm->adapter, mapped_size);
1478 
1479   return message;
1480 }
1481 
1482 gboolean
gst_ipc_pipeline_comm_write_message_to_fd(GstIpcPipelineComm * comm,GstMessage * message)1483 gst_ipc_pipeline_comm_write_message_to_fd (GstIpcPipelineComm * comm,
1484     GstMessage * message)
1485 {
1486   const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE;
1487   gboolean ret;
1488   guint32 type, size, ret32 = TRUE, slen;
1489   char *str = NULL;
1490   const GstStructure *structure;
1491   GstByteWriter bw;
1492 
1493   /* we special case error as gst can't serialize/de-serialize it */
1494   if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR
1495       || GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING
1496       || GST_MESSAGE_TYPE (message) == GST_MESSAGE_INFO)
1497     return gst_ipc_pipeline_comm_write_gerror_message_to_fd (comm, message);
1498 
1499   g_mutex_lock (&comm->mutex);
1500   ++comm->send_id;
1501 
1502   GST_TRACE_OBJECT (comm->element, "Writing message %u: %" GST_PTR_FORMAT,
1503       comm->send_id, message);
1504 
1505   gst_byte_writer_init (&bw);
1506   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1507     goto write_failed;
1508   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1509     goto write_failed;
1510   structure = gst_message_get_structure (message);
1511   if (structure) {
1512     str = gst_structure_to_string (structure);
1513     slen = strlen (str);
1514   } else {
1515     str = NULL;
1516     slen = 0;
1517   }
1518   size = sizeof (type) + slen + 1;
1519   if (!gst_byte_writer_put_uint32_le (&bw, size))
1520     goto write_failed;
1521 
1522   type = GST_MESSAGE_TYPE (message);
1523   if (!gst_byte_writer_put_uint32_le (&bw, type))
1524     goto write_failed;
1525   size -= sizeof (type);
1526   if (str) {
1527     if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size))
1528       goto write_failed;
1529   } else {
1530     if (!gst_byte_writer_put_uint8 (&bw, 0))
1531       goto write_failed;
1532   }
1533 
1534   if (!write_byte_writer_to_fd (comm, &bw))
1535     goto write_failed;
1536 
1537   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
1538           ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE))
1539     goto write_failed;
1540 
1541   ret = ret32;
1542 
1543 done:
1544   g_mutex_unlock (&comm->mutex);
1545   g_free (str);
1546   gst_byte_writer_reset (&bw);
1547   return ret;
1548 
1549 write_failed:
1550   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1551       ("Failed to write to socket"));
1552   ret = FALSE;
1553   goto done;
1554 }
1555 
1556 static GstMessage *
gst_ipc_pipeline_comm_read_message(GstIpcPipelineComm * comm,guint32 size)1557 gst_ipc_pipeline_comm_read_message (GstIpcPipelineComm * comm, guint32 size)
1558 {
1559   GstMessage *message = NULL;
1560   gchar *end = NULL;
1561   GstStructure *structure;
1562   guint32 type;
1563   guint32 mapped_size = size;
1564   const guint8 *payload;
1565 
1566   /* this should not be called if we don't have enough yet */
1567   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
1568   g_return_val_if_fail (size >= sizeof (type), NULL);
1569 
1570   payload = gst_adapter_map (comm->adapter, mapped_size);
1571   if (!payload)
1572     return NULL;
1573   memcpy (&type, payload, sizeof (type));
1574   payload += sizeof (type);
1575   size -= sizeof (type);
1576   if (size == 0)
1577     goto done;
1578 
1579   if (payload[size - 1])
1580     goto done;
1581   if (*payload) {
1582     structure = gst_structure_from_string ((const char *) payload, &end);
1583   } else {
1584     structure = NULL;
1585   }
1586 
1587   message =
1588       gst_message_new_custom (type, GST_OBJECT (comm->element), structure);
1589 
1590 done:
1591   gst_adapter_unmap (comm->adapter);
1592   gst_adapter_flush (comm->adapter, mapped_size);
1593 
1594   return message;
1595 }
1596 
1597 void
gst_ipc_pipeline_comm_init(GstIpcPipelineComm * comm,GstElement * element)1598 gst_ipc_pipeline_comm_init (GstIpcPipelineComm * comm, GstElement * element)
1599 {
1600   g_mutex_init (&comm->mutex);
1601   comm->element = element;
1602   comm->fdin = comm->fdout = -1;
1603   comm->ack_time = DEFAULT_ACK_TIME;
1604   comm->waiting_ids =
1605       g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
1606       (GDestroyNotify) comm_request_free);
1607   comm->adapter = gst_adapter_new ();
1608   comm->poll = gst_poll_new (TRUE);
1609   gst_poll_fd_init (&comm->pollFDin);
1610 }
1611 
1612 void
gst_ipc_pipeline_comm_clear(GstIpcPipelineComm * comm)1613 gst_ipc_pipeline_comm_clear (GstIpcPipelineComm * comm)
1614 {
1615   g_hash_table_destroy (comm->waiting_ids);
1616   gst_object_unref (comm->adapter);
1617   gst_poll_free (comm->poll);
1618   g_mutex_clear (&comm->mutex);
1619 }
1620 
1621 static void
cancel_request(gpointer key,gpointer value,gpointer user_data,GstFlowReturn fret)1622 cancel_request (gpointer key, gpointer value, gpointer user_data,
1623     GstFlowReturn fret)
1624 {
1625   GstIpcPipelineComm *comm = (GstIpcPipelineComm *) user_data;
1626   guint32 id = GPOINTER_TO_INT (key);
1627   CommRequest *req = (CommRequest *) value;
1628 
1629   GST_TRACE_OBJECT (comm->element, "Cancelling request %u, type %d", id,
1630       req->type);
1631   req->ret = fret;
1632   req->replied = TRUE;
1633   g_cond_signal (&req->cond);
1634 }
1635 
1636 static void
cancel_request_error(gpointer key,gpointer value,gpointer user_data)1637 cancel_request_error (gpointer key, gpointer value, gpointer user_data)
1638 {
1639   CommRequest *req = (CommRequest *) value;
1640   GstFlowReturn fret = comm_request_ret_get_failure_value (req->type);
1641 
1642   cancel_request (key, value, user_data, fret);
1643 }
1644 
1645 void
gst_ipc_pipeline_comm_cancel(GstIpcPipelineComm * comm,gboolean cleanup)1646 gst_ipc_pipeline_comm_cancel (GstIpcPipelineComm * comm, gboolean cleanup)
1647 {
1648   g_mutex_lock (&comm->mutex);
1649   g_hash_table_foreach (comm->waiting_ids, cancel_request_error, comm);
1650   if (cleanup) {
1651     g_hash_table_unref (comm->waiting_ids);
1652     comm->waiting_ids =
1653         g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
1654         (GDestroyNotify) comm_request_free);
1655   }
1656   g_mutex_unlock (&comm->mutex);
1657 }
1658 
1659 static gboolean
set_field(GQuark field_id,const GValue * value,gpointer user_data)1660 set_field (GQuark field_id, const GValue * value, gpointer user_data)
1661 {
1662   GstStructure *structure = user_data;
1663 
1664   gst_structure_id_set_value (structure, field_id, value);
1665 
1666   return TRUE;
1667 }
1668 
1669 static gboolean
gst_ipc_pipeline_comm_reply_request(GstIpcPipelineComm * comm,guint32 id,GstFlowReturn ret,GstQuery * query)1670 gst_ipc_pipeline_comm_reply_request (GstIpcPipelineComm * comm, guint32 id,
1671     GstFlowReturn ret, GstQuery * query)
1672 {
1673   CommRequest *req;
1674 
1675   req = g_hash_table_lookup (comm->waiting_ids, GINT_TO_POINTER (id));
1676   if (!req) {
1677     GST_WARNING_OBJECT (comm->element, "Got reply for unknown request %u", id);
1678     return FALSE;
1679   }
1680 
1681   GST_TRACE_OBJECT (comm->element, "Got reply %d (%s) for request %u", ret,
1682       comm_request_ret_get_name (req->type, ret), req->id);
1683   req->replied = TRUE;
1684   req->ret = ret;
1685   if (query) {
1686     if (req->query) {
1687       /* We need to update the original query in place, as the caller
1688          will expect the object to be the same */
1689       GstStructure *structure = gst_query_writable_structure (req->query);
1690       gst_structure_remove_all_fields (structure);
1691       gst_structure_foreach (gst_query_get_structure (query), set_field,
1692           structure);
1693     } else {
1694       GST_WARNING_OBJECT (comm->element,
1695           "Got query reply, but no query was in the request");
1696     }
1697   }
1698   g_cond_signal (&req->cond);
1699   return TRUE;
1700 }
1701 
1702 static gint
update_adapter(GstIpcPipelineComm * comm)1703 update_adapter (GstIpcPipelineComm * comm)
1704 {
1705   GstMemory *mem = NULL;
1706   GstBuffer *buf;
1707   GstMapInfo map;
1708   ssize_t sz;
1709   gint ret = 0;
1710 
1711 again:
1712   /* update pollFDin if necessary (fdin changed or we lost our parent).
1713    * we do not allow a parent-less element to communicate with its peer
1714    * in order to avoid race conditions where the slave tries to change
1715    * the state of its parent pipeline while it is not yet added in that
1716    * pipeline. */
1717   if (comm->pollFDin.fd != comm->fdin || !GST_OBJECT_PARENT (comm->element)) {
1718     if (comm->pollFDin.fd != -1) {
1719       GST_DEBUG_OBJECT (comm->element, "Stop watching fd %d",
1720           comm->pollFDin.fd);
1721       gst_poll_remove_fd (comm->poll, &comm->pollFDin);
1722       gst_poll_fd_init (&comm->pollFDin);
1723     }
1724     if (comm->fdin != -1 && GST_OBJECT_PARENT (comm->element)) {
1725       GST_DEBUG_OBJECT (comm->element, "Start watching fd %d", comm->fdin);
1726       comm->pollFDin.fd = comm->fdin;
1727       gst_poll_add_fd (comm->poll, &comm->pollFDin);
1728       gst_poll_fd_ctl_read (comm->poll, &comm->pollFDin, TRUE);
1729     }
1730   }
1731 
1732   /* wait for activity on fdin or a flush */
1733   if (gst_poll_wait (comm->poll, 100 * GST_MSECOND) < 0) {
1734     if (errno == EAGAIN)
1735       goto again;
1736     /* error out, unless interrupted or flushing */
1737     if (errno != EINTR)
1738       ret = (errno == EBUSY) ? 2 : 1;
1739   }
1740 
1741   /* read from fdin if possible and push data to our adapter */
1742   if (comm->pollFDin.fd >= 0
1743       && gst_poll_fd_can_read (comm->poll, &comm->pollFDin)) {
1744     if (!mem)
1745       mem = gst_allocator_alloc (NULL, comm->read_chunk_size, NULL);
1746 
1747     gst_memory_map (mem, &map, GST_MAP_WRITE);
1748     sz = read (comm->pollFDin.fd, map.data, map.size);
1749     gst_memory_unmap (mem, &map);
1750 
1751     if (sz <= 0) {
1752       if (errno == EAGAIN)
1753         goto again;
1754       /* error out, unless interrupted */
1755       if (errno != EINTR)
1756         ret = 1;
1757     } else {
1758       gst_memory_resize (mem, 0, sz);
1759       buf = gst_buffer_new ();
1760       gst_buffer_append_memory (buf, mem);
1761       mem = NULL;
1762       GST_TRACE_OBJECT (comm->element, "Read %u bytes from fd", (unsigned) sz);
1763       gst_adapter_push (comm->adapter, buf);
1764     }
1765   }
1766 
1767   if (mem)
1768     gst_memory_unref (mem);
1769 
1770   return ret;
1771 }
1772 
1773 static gboolean
read_many(GstIpcPipelineComm * comm)1774 read_many (GstIpcPipelineComm * comm)
1775 {
1776   gboolean ret = TRUE;
1777   gsize available;
1778   const guint8 *payload;
1779 
1780   while (1)
1781     switch (comm->state) {
1782       case GST_IPC_PIPELINE_COMM_STATE_TYPE:
1783       {
1784         guint8 type;
1785         guint32 mapped_size;
1786 
1787         available = gst_adapter_available (comm->adapter);
1788         mapped_size = 1 + sizeof (gint32) * 2;
1789         if (available < mapped_size)
1790           goto done;
1791 
1792         payload = gst_adapter_map (comm->adapter, mapped_size);
1793         type = *payload++;
1794         g_mutex_lock (&comm->mutex);
1795         memcpy (&comm->id, payload, sizeof (guint32));
1796         memcpy (&comm->payload_length, payload + 4, sizeof (guint32));
1797         g_mutex_unlock (&comm->mutex);
1798         gst_adapter_unmap (comm->adapter);
1799         gst_adapter_flush (comm->adapter, mapped_size);
1800         GST_TRACE_OBJECT (comm->element, "Got id %u, type %d, payload %u",
1801             comm->id, type, comm->payload_length);
1802         switch (type) {
1803           case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
1804           case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
1805           case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
1806           case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
1807           case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
1808           case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
1809           case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
1810           case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
1811           case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
1812           case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
1813             GST_TRACE_OBJECT (comm->element, "switching to state %s",
1814                 gst_ipc_pipeline_comm_data_type_get_name (type));
1815             comm->state = type;
1816             break;
1817           default:
1818             goto out_of_sync;
1819         }
1820         break;
1821       }
1822       case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
1823       {
1824         const guint8 *rets;
1825         guint32 ret32;
1826 
1827         available = gst_adapter_available (comm->adapter);
1828         if (available < comm->payload_length)
1829           goto done;
1830 
1831         if (available < sizeof (guint32))
1832           goto ack_failed;
1833 
1834         rets = gst_adapter_map (comm->adapter, sizeof (guint32));
1835         memcpy (&ret32, rets, sizeof (ret32));
1836         gst_adapter_unmap (comm->adapter);
1837         gst_adapter_flush (comm->adapter, sizeof (guint32));
1838         GST_TRACE_OBJECT (comm->element, "Got ACK %s for id %u",
1839             gst_flow_get_name (ret32), comm->id);
1840 
1841         g_mutex_lock (&comm->mutex);
1842         gst_ipc_pipeline_comm_reply_request (comm, comm->id, ret32, NULL);
1843         g_mutex_unlock (&comm->mutex);
1844 
1845         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1846         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1847         break;
1848       }
1849       case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
1850       {
1851         GstQuery *query = NULL;
1852         gboolean qret;
1853 
1854         available = gst_adapter_available (comm->adapter);
1855         if (available < comm->payload_length)
1856           goto done;
1857 
1858         qret =
1859             gst_ipc_pipeline_comm_read_query_result (comm, comm->payload_length,
1860             &query);
1861 
1862         GST_TRACE_OBJECT (comm->element,
1863             "deserialized query result %p: %d, %" GST_PTR_FORMAT, query, qret,
1864             query);
1865 
1866         g_mutex_lock (&comm->mutex);
1867         gst_ipc_pipeline_comm_reply_request (comm, comm->id, qret, query);
1868         g_mutex_unlock (&comm->mutex);
1869 
1870         gst_query_unref (query);
1871 
1872         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1873         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1874         break;
1875       }
1876       case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
1877       {
1878         GstBuffer *buf;
1879 
1880         available = gst_adapter_available (comm->adapter);
1881         if (available < comm->payload_length)
1882           goto done;
1883 
1884         buf = gst_ipc_pipeline_comm_read_buffer (comm, comm->payload_length);
1885         if (!buf)
1886           goto buffer_failed;
1887 
1888         /* set caps and push */
1889         GST_TRACE_OBJECT (comm->element,
1890             "deserialized buffer %p, pushing, timestamp %" GST_TIME_FORMAT
1891             ", duration %" GST_TIME_FORMAT ", offset %" G_GINT64_FORMAT
1892             ", offset_end %" G_GINT64_FORMAT ", size %" G_GSIZE_FORMAT
1893             ", flags 0x%x", buf, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
1894             GST_TIME_ARGS (GST_BUFFER_DURATION (buf)), GST_BUFFER_OFFSET (buf),
1895             GST_BUFFER_OFFSET_END (buf), gst_buffer_get_size (buf),
1896             GST_BUFFER_FLAGS (buf));
1897 
1898         gst_mini_object_set_qdata (GST_MINI_OBJECT (buf), QUARK_ID,
1899             GINT_TO_POINTER (comm->id), NULL);
1900 
1901         if (comm->on_buffer)
1902           (*comm->on_buffer) (comm->id, buf, comm->user_data);
1903 
1904         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1905         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1906         break;
1907       }
1908       case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
1909       {
1910         GstEvent *event;
1911         gboolean upstream;
1912 
1913         available = gst_adapter_available (comm->adapter);
1914         if (available < comm->payload_length)
1915           goto done;
1916 
1917         event = gst_ipc_pipeline_comm_read_event (comm, comm->payload_length,
1918             &upstream);
1919         if (!event)
1920           goto event_failed;
1921 
1922         GST_TRACE_OBJECT (comm->element, "deserialized event %p of type %s",
1923             event, gst_event_type_get_name (event->type));
1924 
1925         gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID,
1926             GINT_TO_POINTER (comm->id), NULL);
1927 
1928         if (comm->on_event)
1929           (*comm->on_event) (comm->id, event, upstream, comm->user_data);
1930 
1931         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1932         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1933         break;
1934       }
1935       case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
1936       {
1937         GstEvent *event;
1938 
1939         available = gst_adapter_available (comm->adapter);
1940         if (available < comm->payload_length)
1941           goto done;
1942 
1943         event = gst_ipc_pipeline_comm_read_sink_message_event (comm,
1944             comm->payload_length);
1945         if (!event)
1946           goto event_failed;
1947 
1948         GST_TRACE_OBJECT (comm->element, "deserialized sink message event %p",
1949             event);
1950 
1951         gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID,
1952             GINT_TO_POINTER (comm->id), NULL);
1953 
1954         if (comm->on_event)
1955           (*comm->on_event) (comm->id, event, FALSE, comm->user_data);
1956 
1957         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1958         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1959         break;
1960       }
1961       case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
1962       {
1963         GstQuery *query;
1964         gboolean upstream;
1965 
1966         available = gst_adapter_available (comm->adapter);
1967         if (available < comm->payload_length)
1968           goto done;
1969 
1970         query = gst_ipc_pipeline_comm_read_query (comm, comm->payload_length,
1971             &upstream);
1972         if (!query)
1973           goto query_failed;
1974 
1975         GST_TRACE_OBJECT (comm->element, "deserialized query %p of type %s",
1976             query, gst_query_type_get_name (query->type));
1977 
1978         gst_mini_object_set_qdata (GST_MINI_OBJECT (query), QUARK_ID,
1979             GINT_TO_POINTER (comm->id), NULL);
1980 
1981         if (comm->on_query)
1982           (*comm->on_query) (comm->id, query, upstream, comm->user_data);
1983 
1984         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1985         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1986         break;
1987       }
1988       case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
1989       {
1990         guint32 transition;
1991 
1992         available = gst_adapter_available (comm->adapter);
1993         if (available < comm->payload_length)
1994           goto done;
1995 
1996         if (!gst_ipc_pipeline_comm_read_state_change (comm,
1997                 comm->payload_length, &transition))
1998           goto state_change_failed;
1999 
2000         GST_TRACE_OBJECT (comm->element,
2001             "deserialized state change request: %s -> %s",
2002             gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT
2003                 (transition)),
2004             gst_element_state_get_name (GST_STATE_TRANSITION_NEXT
2005                 (transition)));
2006 
2007         if (comm->on_state_change)
2008           (*comm->on_state_change) (comm->id, transition, comm->user_data);
2009 
2010         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2011         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2012         break;
2013       }
2014       case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
2015       {
2016         available = gst_adapter_available (comm->adapter);
2017         if (available < comm->payload_length)
2018           goto done;
2019 
2020         if (!gst_ipc_pipeline_comm_read_state_lost (comm, comm->payload_length))
2021           goto event_failed;
2022 
2023         GST_TRACE_OBJECT (comm->element, "deserialized state-lost");
2024 
2025         if (comm->on_state_lost)
2026           (*comm->on_state_lost) (comm->user_data);
2027 
2028         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2029         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2030         break;
2031       }
2032       case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
2033       {
2034         GstMessage *message;
2035 
2036         available = gst_adapter_available (comm->adapter);
2037         if (available < comm->payload_length)
2038           goto done;
2039 
2040         message = gst_ipc_pipeline_comm_read_message (comm,
2041             comm->payload_length);
2042         if (!message)
2043           goto message_failed;
2044 
2045         GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s",
2046             message, gst_message_type_get_name (message->type));
2047 
2048         if (comm->on_message)
2049           (*comm->on_message) (comm->id, message, comm->user_data);
2050 
2051         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2052         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2053         break;
2054       }
2055       case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
2056       {
2057         GstMessage *message;
2058 
2059         available = gst_adapter_available (comm->adapter);
2060         if (available < comm->payload_length)
2061           goto done;
2062 
2063         message = gst_ipc_pipeline_comm_read_gerror_message (comm,
2064             comm->payload_length);
2065         if (!message)
2066           goto message_failed;
2067 
2068         GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s",
2069             message, gst_message_type_get_name (message->type));
2070 
2071         if (comm->on_message)
2072           (*comm->on_message) (comm->id, message, comm->user_data);
2073 
2074         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2075         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2076         break;
2077       }
2078     }
2079 
2080 done:
2081   return ret;
2082 
2083   /* ERRORS */
2084 out_of_sync:
2085   {
2086     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2087         ("Socket out of sync"));
2088     ret = FALSE;
2089     goto done;
2090   }
2091 state_change_failed:
2092   {
2093     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2094         ("could not read state change from fd"));
2095     ret = FALSE;
2096     goto done;
2097   }
2098 ack_failed:
2099   {
2100     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2101         ("could not read ack from fd"));
2102     ret = FALSE;
2103     goto done;
2104   }
2105 buffer_failed:
2106   {
2107     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2108         ("could not read buffer from fd"));
2109     ret = FALSE;
2110     goto done;
2111   }
2112 event_failed:
2113   {
2114     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2115         ("could not read event from fd"));
2116     ret = FALSE;
2117     goto done;
2118   }
2119 message_failed:
2120   {
2121     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2122         ("could not read message from fd"));
2123     ret = FALSE;
2124     goto done;
2125   }
2126 query_failed:
2127   {
2128     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2129         ("could not read query from fd"));
2130     ret = FALSE;
2131     goto done;
2132   }
2133 }
2134 
2135 static gpointer
reader_thread(gpointer data)2136 reader_thread (gpointer data)
2137 {
2138   GstIpcPipelineComm *comm = (GstIpcPipelineComm *) data;
2139   gboolean running = TRUE;
2140   gint ret = 0;
2141 
2142   while (running) {
2143     ret = update_adapter (comm);
2144     switch (ret) {
2145       case 1:
2146         GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL),
2147             ("Failed to read from socket"));
2148         running = FALSE;
2149         break;
2150       case 2:
2151         GST_INFO_OBJECT (comm->element, "We're stopping, all good");
2152         running = FALSE;
2153         break;
2154       default:
2155         read_many (comm);
2156         break;
2157     }
2158   }
2159 
2160   GST_INFO_OBJECT (comm->element, "Reader thread ending");
2161   return NULL;
2162 }
2163 
2164 gboolean
gst_ipc_pipeline_comm_start_reader_thread(GstIpcPipelineComm * comm,void (* on_buffer)(guint32,GstBuffer *,gpointer),void (* on_event)(guint32,GstEvent *,gboolean,gpointer),void (* on_query)(guint32,GstQuery *,gboolean,gpointer),void (* on_state_change)(guint32,GstStateChange,gpointer),void (* on_state_lost)(gpointer),void (* on_message)(guint32,GstMessage *,gpointer),gpointer user_data)2165 gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm,
2166     void (*on_buffer) (guint32, GstBuffer *, gpointer),
2167     void (*on_event) (guint32, GstEvent *, gboolean, gpointer),
2168     void (*on_query) (guint32, GstQuery *, gboolean, gpointer),
2169     void (*on_state_change) (guint32, GstStateChange, gpointer),
2170     void (*on_state_lost) (gpointer),
2171     void (*on_message) (guint32, GstMessage *, gpointer), gpointer user_data)
2172 {
2173   if (comm->reader_thread)
2174     return FALSE;
2175 
2176   comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2177   comm->on_buffer = on_buffer;
2178   comm->on_event = on_event;
2179   comm->on_query = on_query;
2180   comm->on_state_change = on_state_change;
2181   comm->on_state_lost = on_state_lost;
2182   comm->on_message = on_message;
2183   comm->user_data = user_data;
2184   gst_poll_set_flushing (comm->poll, FALSE);
2185   comm->reader_thread =
2186       g_thread_new ("reader", (GThreadFunc) reader_thread, comm);
2187   return TRUE;
2188 }
2189 
2190 void
gst_ipc_pipeline_comm_stop_reader_thread(GstIpcPipelineComm * comm)2191 gst_ipc_pipeline_comm_stop_reader_thread (GstIpcPipelineComm * comm)
2192 {
2193   if (!comm->reader_thread)
2194     return;
2195 
2196   gst_poll_set_flushing (comm->poll, TRUE);
2197   g_thread_join (comm->reader_thread);
2198   comm->reader_thread = NULL;
2199 }
2200 
2201 static gchar *
gst_value_serialize_event(const GValue * value)2202 gst_value_serialize_event (const GValue * value)
2203 {
2204   const GstStructure *structure;
2205   GstEvent *ev;
2206   gchar *type, *ts, *seqnum, *rt_offset, *str, *str64, *s;
2207   GValue val = G_VALUE_INIT;
2208 
2209   ev = g_value_get_boxed (value);
2210 
2211   g_value_init (&val, gst_event_type_get_type ());
2212   g_value_set_enum (&val, ev->type);
2213   type = gst_value_serialize (&val);
2214   g_value_unset (&val);
2215 
2216   g_value_init (&val, G_TYPE_UINT64);
2217   g_value_set_uint64 (&val, ev->timestamp);
2218   ts = gst_value_serialize (&val);
2219   g_value_unset (&val);
2220 
2221   g_value_init (&val, G_TYPE_UINT);
2222   g_value_set_uint (&val, ev->seqnum);
2223   seqnum = gst_value_serialize (&val);
2224   g_value_unset (&val);
2225 
2226   g_value_init (&val, G_TYPE_INT64);
2227   g_value_set_int64 (&val, gst_event_get_running_time_offset (ev));
2228   rt_offset = gst_value_serialize (&val);
2229   g_value_unset (&val);
2230 
2231   structure = gst_event_get_structure (ev);
2232   str = gst_structure_to_string (structure);
2233   str64 = g_base64_encode ((guchar *) str, strlen (str) + 1);
2234   g_strdelimit (str64, "=", '_');
2235   g_free (str);
2236 
2237   s = g_strconcat (type, ":", ts, ":", seqnum, ":", rt_offset, ":", str64,
2238       NULL);
2239 
2240   g_free (type);
2241   g_free (ts);
2242   g_free (seqnum);
2243   g_free (rt_offset);
2244   g_free (str64);
2245 
2246   return s;
2247 }
2248 
2249 static gboolean
gst_value_deserialize_event(GValue * dest,const gchar * s)2250 gst_value_deserialize_event (GValue * dest, const gchar * s)
2251 {
2252   GstEvent *ev = NULL;
2253   GValue val = G_VALUE_INIT;
2254   gboolean ret = FALSE;
2255   gchar **fields;
2256   gsize len;
2257 
2258   fields = g_strsplit (s, ":", -1);
2259   if (g_strv_length (fields) != 5)
2260     goto wrong_length;
2261 
2262   g_strdelimit (fields[4], "_", '=');
2263   g_base64_decode_inplace (fields[4], &len);
2264 
2265   g_value_init (&val, gst_event_type_get_type ());
2266   if (!gst_value_deserialize (&val, fields[0]))
2267     goto fail;
2268   ev = gst_event_new_custom (g_value_get_enum (&val),
2269       gst_structure_new_from_string (fields[4]));
2270 
2271   g_value_unset (&val);
2272   g_value_init (&val, G_TYPE_UINT64);
2273   if (!gst_value_deserialize (&val, fields[1]))
2274     goto fail;
2275   ev->timestamp = g_value_get_uint64 (&val);
2276 
2277   g_value_unset (&val);
2278   g_value_init (&val, G_TYPE_UINT);
2279   if (!gst_value_deserialize (&val, fields[2]))
2280     goto fail;
2281   ev->seqnum = g_value_get_uint (&val);
2282 
2283   g_value_unset (&val);
2284   g_value_init (&val, G_TYPE_INT64);
2285   if (!gst_value_deserialize (&val, fields[3]))
2286     goto fail;
2287   gst_event_set_running_time_offset (ev, g_value_get_int64 (&val));
2288 
2289   g_value_take_boxed (dest, ev);
2290   ev = NULL;
2291   ret = TRUE;
2292 
2293 fail:
2294   g_clear_pointer (&ev, gst_event_unref);
2295   g_value_unset (&val);
2296 
2297 wrong_length:
2298   g_strfreev (fields);
2299   return ret;
2300 }
2301 
2302 #define REGISTER_SERIALIZATION_NO_COMPARE(_gtype, _type)                \
2303 G_STMT_START {                                                          \
2304   static GstValueTable gst_value =                                      \
2305     { 0, NULL,                                             \
2306     gst_value_serialize_ ## _type, gst_value_deserialize_ ## _type };    \
2307   gst_value.type = _gtype;                                              \
2308   gst_value_register (&gst_value);                                      \
2309 } G_STMT_END
2310 
2311 void
gst_ipc_pipeline_comm_plugin_init(void)2312 gst_ipc_pipeline_comm_plugin_init (void)
2313 {
2314   static volatile gsize once = 0;
2315 
2316   if (g_once_init_enter (&once)) {
2317     GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_comm_debug, "ipcpipelinecomm", 0,
2318         "ipc pipeline comm");
2319     QUARK_ID = g_quark_from_static_string ("ipcpipeline-id");
2320     REGISTER_SERIALIZATION_NO_COMPARE (gst_event_get_type (), event);
2321     g_once_init_leave (&once, (gsize) 1);
2322   }
2323 }
2324