1 /* GStreamer
2 * Copyright (C) 2015-2017 YouView TV Ltd
3 * Author: Vincent Penquerch <vincent.penquerch@collabora.co.uk>
4 *
5 * gstipcpipelinecomm.c:
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
16 *
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20 * Boston, MA 02110-1301, USA.
21 */
22
23 #ifdef HAVE_CONFIG_H
24 # include "config.h"
25 #endif
26
27 #include <unistd.h>
28 #include <errno.h>
29 #include <string.h>
30 #include <gst/base/gstbytewriter.h>
31 #include <gst/gstprotection.h>
32 #include "gstipcpipelinecomm.h"
33
34 GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_comm_debug);
35 #define GST_CAT_DEFAULT gst_ipc_pipeline_comm_debug
36
37 #define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND)
38
39 GQuark QUARK_ID;
40
41 typedef enum
42 {
43 ACK_TYPE_NONE,
44 ACK_TYPE_TIMED,
45 ACK_TYPE_BLOCKING
46 } AckType;
47
48 typedef enum
49 {
50 COMM_REQUEST_TYPE_BUFFER,
51 COMM_REQUEST_TYPE_EVENT,
52 COMM_REQUEST_TYPE_QUERY,
53 COMM_REQUEST_TYPE_STATE_CHANGE,
54 COMM_REQUEST_TYPE_MESSAGE,
55 } CommRequestType;
56
57 typedef struct
58 {
59 guint32 id;
60 gboolean replied;
61 gboolean comm_error;
62 guint32 ret;
63 GstQuery *query;
64 CommRequestType type;
65 GCond cond;
66 } CommRequest;
67
68 static const gchar *comm_request_ret_get_name (CommRequestType type,
69 guint32 ret);
70 static guint32 comm_request_ret_get_failure_value (CommRequestType type);
71
72 static CommRequest *
comm_request_new(guint32 id,CommRequestType type,GstQuery * query)73 comm_request_new (guint32 id, CommRequestType type, GstQuery * query)
74 {
75 CommRequest *req;
76
77 req = g_malloc (sizeof (CommRequest));
78 req->id = id;
79 g_cond_init (&req->cond);
80 req->replied = FALSE;
81 req->comm_error = FALSE;
82 req->query = query;
83 req->ret = comm_request_ret_get_failure_value (type);
84 req->type = type;
85
86 return req;
87 }
88
89 static guint32
comm_request_wait(GstIpcPipelineComm * comm,CommRequest * req,AckType ack_type)90 comm_request_wait (GstIpcPipelineComm * comm, CommRequest * req,
91 AckType ack_type)
92 {
93 guint32 ret = comm_request_ret_get_failure_value (req->type);
94 guint64 end_time;
95
96 if (ack_type == ACK_TYPE_TIMED)
97 end_time = g_get_monotonic_time () + comm->ack_time;
98 else
99 end_time = G_MAXUINT64;
100
101 GST_TRACE_OBJECT (comm->element, "Waiting for ACK/NAK for request %u",
102 req->id);
103 while (!req->replied) {
104 if (ack_type == ACK_TYPE_TIMED) {
105 if (!g_cond_wait_until (&req->cond, &comm->mutex, end_time))
106 break;
107 } else
108 g_cond_wait (&req->cond, &comm->mutex);
109 }
110
111 if (req->replied) {
112 ret = req->ret;
113 GST_TRACE_OBJECT (comm->element, "Got reply for request %u: %d (%s)",
114 req->id, ret, comm_request_ret_get_name (req->type, ret));
115 } else {
116 req->comm_error = TRUE;
117 GST_ERROR_OBJECT (comm->element, "Timeout waiting for reply for request %u",
118 req->id);
119 }
120
121 return ret;
122 }
123
124 static void
comm_request_free(CommRequest * req)125 comm_request_free (CommRequest * req)
126 {
127 g_cond_clear (&req->cond);
128 g_free (req);
129 }
130
131 static const gchar *
comm_request_ret_get_name(CommRequestType type,guint32 ret)132 comm_request_ret_get_name (CommRequestType type, guint32 ret)
133 {
134 switch (type) {
135 case COMM_REQUEST_TYPE_BUFFER:
136 return gst_flow_get_name (ret);
137 case COMM_REQUEST_TYPE_EVENT:
138 case COMM_REQUEST_TYPE_QUERY:
139 case COMM_REQUEST_TYPE_MESSAGE:
140 return ret ? "TRUE" : "FALSE";
141 case COMM_REQUEST_TYPE_STATE_CHANGE:
142 return gst_element_state_change_return_get_name (ret);
143 default:
144 g_assert_not_reached ();
145 }
146 }
147
148 static guint32
comm_request_ret_get_failure_value(CommRequestType type)149 comm_request_ret_get_failure_value (CommRequestType type)
150 {
151 switch (type) {
152 case COMM_REQUEST_TYPE_BUFFER:
153 return GST_FLOW_COMM_ERROR;
154 case COMM_REQUEST_TYPE_EVENT:
155 case COMM_REQUEST_TYPE_MESSAGE:
156 case COMM_REQUEST_TYPE_QUERY:
157 return FALSE;
158 case COMM_REQUEST_TYPE_STATE_CHANGE:
159 return GST_STATE_CHANGE_FAILURE;
160 default:
161 g_assert_not_reached ();
162 }
163 }
164
165 static const gchar *
gst_ipc_pipeline_comm_data_type_get_name(GstIpcPipelineCommDataType type)166 gst_ipc_pipeline_comm_data_type_get_name (GstIpcPipelineCommDataType type)
167 {
168 switch (type) {
169 case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
170 return "ACK";
171 case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
172 return "QUERY_RESULT";
173 case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
174 return "BUFFER";
175 case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
176 return "EVENT";
177 case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
178 return "SINK_MESSAGE_EVENT";
179 case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
180 return "QUERY";
181 case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
182 return "STATE_CHANGE";
183 case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
184 return "STATE_LOST";
185 case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
186 return "MESSAGE";
187 case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
188 return "GERROR_MESSAGE";
189 default:
190 return "UNKNOWN";
191 }
192 }
193
194 static gboolean
gst_ipc_pipeline_comm_sync_fd(GstIpcPipelineComm * comm,guint32 id,GstQuery * query,guint32 * ret,AckType ack_type,CommRequestType type)195 gst_ipc_pipeline_comm_sync_fd (GstIpcPipelineComm * comm, guint32 id,
196 GstQuery * query, guint32 * ret, AckType ack_type, CommRequestType type)
197 {
198 CommRequest *req;
199 gboolean comm_error;
200 GHashTable *waiting_ids;
201
202 if (ack_type == ACK_TYPE_NONE)
203 return TRUE;
204
205 req = comm_request_new (id, type, query);
206 waiting_ids = g_hash_table_ref (comm->waiting_ids);
207 g_hash_table_insert (waiting_ids, GINT_TO_POINTER (id), req);
208 *ret = comm_request_wait (comm, req, ack_type);
209 comm_error = req->comm_error;
210 g_hash_table_remove (waiting_ids, GINT_TO_POINTER (id));
211 g_hash_table_unref (waiting_ids);
212 return !comm_error;
213 }
214
215 static gboolean
write_to_fd_raw(GstIpcPipelineComm * comm,const void * data,size_t size)216 write_to_fd_raw (GstIpcPipelineComm * comm, const void *data, size_t size)
217 {
218 size_t offset;
219 gboolean ret = TRUE;
220
221 offset = 0;
222 GST_TRACE_OBJECT (comm->element, "Writing %zu bytes to fdout", size);
223 while (size) {
224 ssize_t written =
225 write (comm->fdout, (const unsigned char *) data + offset, size);
226 if (written < 0) {
227 if (errno == EAGAIN || errno == EINTR)
228 continue;
229 GST_ERROR_OBJECT (comm->element, "Failed to write to fd: %s",
230 strerror (errno));
231 ret = FALSE;
232 goto done;
233 }
234 size -= written;
235 offset += written;
236 }
237
238 done:
239 return ret;
240 }
241
242 static gboolean
write_byte_writer_to_fd(GstIpcPipelineComm * comm,GstByteWriter * bw)243 write_byte_writer_to_fd (GstIpcPipelineComm * comm, GstByteWriter * bw)
244 {
245 guint8 *data;
246 gboolean ret;
247 guint size;
248
249 size = gst_byte_writer_get_size (bw);
250 data = gst_byte_writer_reset_and_get_data (bw);
251 if (!data)
252 return FALSE;
253 ret = write_to_fd_raw (comm, data, size);
254 g_free (data);
255 return ret;
256 }
257
258 static void
gst_ipc_pipeline_comm_write_ack_to_fd(GstIpcPipelineComm * comm,guint32 id,guint32 ret,CommRequestType type)259 gst_ipc_pipeline_comm_write_ack_to_fd (GstIpcPipelineComm * comm, guint32 id,
260 guint32 ret, CommRequestType type)
261 {
262 const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK;
263 guint32 size;
264 GstByteWriter bw;
265
266 g_mutex_lock (&comm->mutex);
267
268 GST_TRACE_OBJECT (comm->element, "Writing ACK for %u: %s (%d)", id,
269 comm_request_ret_get_name (type, ret), ret);
270 gst_byte_writer_init (&bw);
271 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
272 goto write_failed;
273 if (!gst_byte_writer_put_uint32_le (&bw, id))
274 goto write_failed;
275 size = sizeof (ret);
276 if (!gst_byte_writer_put_uint32_le (&bw, size))
277 goto write_failed;
278 if (!gst_byte_writer_put_uint32_le (&bw, ret))
279 goto write_failed;
280
281 if (!write_byte_writer_to_fd (comm, &bw))
282 goto write_failed;
283
284 done:
285 g_mutex_unlock (&comm->mutex);
286 gst_byte_writer_reset (&bw);
287 return;
288
289 write_failed:
290 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
291 ("Failed to write to socket"));
292 goto done;
293 }
294
295 void
gst_ipc_pipeline_comm_write_flow_ack_to_fd(GstIpcPipelineComm * comm,guint32 id,GstFlowReturn ret)296 gst_ipc_pipeline_comm_write_flow_ack_to_fd (GstIpcPipelineComm * comm,
297 guint32 id, GstFlowReturn ret)
298 {
299 gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
300 COMM_REQUEST_TYPE_BUFFER);
301 }
302
303 void
gst_ipc_pipeline_comm_write_boolean_ack_to_fd(GstIpcPipelineComm * comm,guint32 id,gboolean ret)304 gst_ipc_pipeline_comm_write_boolean_ack_to_fd (GstIpcPipelineComm * comm,
305 guint32 id, gboolean ret)
306 {
307 gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
308 COMM_REQUEST_TYPE_EVENT);
309 }
310
311 void
gst_ipc_pipeline_comm_write_state_change_ack_to_fd(GstIpcPipelineComm * comm,guint32 id,GstStateChangeReturn ret)312 gst_ipc_pipeline_comm_write_state_change_ack_to_fd (GstIpcPipelineComm * comm,
313 guint32 id, GstStateChangeReturn ret)
314 {
315 gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
316 COMM_REQUEST_TYPE_STATE_CHANGE);
317 }
318
319 void
gst_ipc_pipeline_comm_write_query_result_to_fd(GstIpcPipelineComm * comm,guint32 id,gboolean result,GstQuery * query)320 gst_ipc_pipeline_comm_write_query_result_to_fd (GstIpcPipelineComm * comm,
321 guint32 id, gboolean result, GstQuery * query)
322 {
323 const unsigned char payload_type =
324 GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT;
325 guint8 result8 = result;
326 guint32 size;
327 size_t len;
328 char *str = NULL;
329 guint32 type;
330 const GstStructure *structure;
331 GstByteWriter bw;
332
333 g_mutex_lock (&comm->mutex);
334
335 GST_TRACE_OBJECT (comm->element,
336 "Writing query result for %u: %d, %" GST_PTR_FORMAT, id, result, query);
337 gst_byte_writer_init (&bw);
338 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
339 goto write_failed;
340 if (!gst_byte_writer_put_uint32_le (&bw, id))
341 goto write_failed;
342 structure = gst_query_get_structure (query);
343 if (structure) {
344 str = gst_structure_to_string (structure);
345 len = strlen (str);
346 } else {
347 str = NULL;
348 len = 0;
349 }
350 size = 1 + sizeof (guint32) + len + 1;
351 if (!gst_byte_writer_put_uint32_le (&bw, size))
352 goto write_failed;
353 if (!gst_byte_writer_put_uint8 (&bw, result8))
354 goto write_failed;
355 type = GST_QUERY_TYPE (query);
356 if (!gst_byte_writer_put_uint32_le (&bw, type))
357 goto write_failed;
358 if (str) {
359 if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, len + 1))
360 goto write_failed;
361 } else {
362 if (!gst_byte_writer_put_uint8 (&bw, 0))
363 goto write_failed;
364 }
365
366 if (!write_byte_writer_to_fd (comm, &bw))
367 goto write_failed;
368
369 done:
370 g_mutex_unlock (&comm->mutex);
371 gst_byte_writer_reset (&bw);
372 g_free (str);
373 return;
374
375 write_failed:
376 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
377 ("Failed to write to socket"));
378 goto done;
379 }
380
381 static gboolean
gst_ipc_pipeline_comm_read_query_result(GstIpcPipelineComm * comm,guint32 size,GstQuery ** query)382 gst_ipc_pipeline_comm_read_query_result (GstIpcPipelineComm * comm,
383 guint32 size, GstQuery ** query)
384 {
385 gchar *end = NULL;
386 GstStructure *structure;
387 guint8 result;
388 guint32 type;
389 const guint8 *payload = NULL;
390 guint32 mapped_size = size;
391
392 /* this should not be called if we don't have enough yet */
393 *query = NULL;
394 g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE);
395 g_return_val_if_fail (size >= 1 + sizeof (guint32), FALSE);
396
397 payload = gst_adapter_map (comm->adapter, mapped_size);
398 if (!payload)
399 return FALSE;
400 result = *payload++;
401 memcpy (&type, payload, sizeof (type));
402 payload += sizeof (type);
403
404 size -= 1 + sizeof (guint32);
405 if (size == 0)
406 goto done;
407
408 if (payload[size - 1]) {
409 result = FALSE;
410 goto done;
411 }
412 if (*payload) {
413 structure = gst_structure_from_string ((const char *) payload, &end);
414 } else {
415 structure = NULL;
416 }
417 if (!structure) {
418 result = FALSE;
419 goto done;
420 }
421
422 *query = gst_query_new_custom (type, structure);
423
424 done:
425 gst_adapter_unmap (comm->adapter);
426 gst_adapter_flush (comm->adapter, mapped_size);
427 return result;
428 }
429
430 typedef struct
431 {
432 guint32 bytes;
433
434 guint64 size;
435 guint32 flags;
436 guint64 api;
437 char *str;
438 } MetaBuildInfo;
439
440 typedef struct
441 {
442 GstIpcPipelineComm *comm;
443 guint32 n_meta;
444 guint32 total_bytes;
445 MetaBuildInfo *info;
446 } MetaListRepresentation;
447
448 static gboolean
build_meta(GstBuffer * buffer,GstMeta ** meta,gpointer user_data)449 build_meta (GstBuffer * buffer, GstMeta ** meta, gpointer user_data)
450 {
451 MetaListRepresentation *repr = user_data;
452
453 repr->n_meta++;
454 repr->info = g_realloc (repr->info, repr->n_meta * sizeof (MetaBuildInfo));
455 repr->info[repr->n_meta - 1].bytes =
456 /* 4 byte bytes */
457 4
458 /* 4 byte GstMetaFlags */
459 + 4
460 /* GstMetaInfo::api */
461 + 4 + strlen (g_type_name ((*meta)->info->api)) + 1
462 /* GstMetaInfo::size */
463 + 8
464 /* str length */
465 + 4;
466
467 repr->info[repr->n_meta - 1].flags = (*meta)->flags;
468 repr->info[repr->n_meta - 1].api = (*meta)->info->api;
469 repr->info[repr->n_meta - 1].size = (*meta)->info->size;
470 repr->info[repr->n_meta - 1].str = NULL;
471
472 /* GstMeta is a base class, and actual useful classes are all different...
473 So we list a few of them we know we want and ignore the open ended rest */
474 if ((*meta)->info->api == GST_PROTECTION_META_API_TYPE) {
475 GstProtectionMeta *m = (GstProtectionMeta *) * meta;
476 repr->info[repr->n_meta - 1].str = gst_structure_to_string (m->info);
477 repr->info[repr->n_meta - 1].bytes +=
478 strlen (repr->info[repr->n_meta - 1].str) + 1;
479 GST_TRACE_OBJECT (repr->comm->element, "Found GstMeta type %s: %s",
480 g_type_name ((*meta)->info->api), repr->info[repr->n_meta - 1].str);
481 } else {
482 GST_WARNING_OBJECT (repr->comm->element, "Ignoring GstMeta type %s",
483 g_type_name ((*meta)->info->api));
484 }
485 repr->total_bytes += repr->info[repr->n_meta - 1].bytes;
486 return TRUE;
487 }
488
489 typedef struct
490 {
491 guint64 pts;
492 guint64 dts;
493 guint64 duration;
494 guint64 offset;
495 guint64 offset_end;
496 guint64 flags;
497 } CommBufferMetadata;
498
499 GstFlowReturn
gst_ipc_pipeline_comm_write_buffer_to_fd(GstIpcPipelineComm * comm,GstBuffer * buffer)500 gst_ipc_pipeline_comm_write_buffer_to_fd (GstIpcPipelineComm * comm,
501 GstBuffer * buffer)
502 {
503 const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER;
504 GstMapInfo map;
505 guint32 ret32 = GST_FLOW_OK;
506 guint32 size, n;
507 CommBufferMetadata meta;
508 GstFlowReturn ret;
509 MetaListRepresentation repr = { comm, 0, 4, NULL }; /* starts a 4 for n_meta */
510 GstByteWriter bw;
511
512 g_mutex_lock (&comm->mutex);
513 ++comm->send_id;
514
515 GST_TRACE_OBJECT (comm->element, "Writing buffer %u: %" GST_PTR_FORMAT,
516 comm->send_id, buffer);
517
518 gst_byte_writer_init (&bw);
519
520 meta.pts = GST_BUFFER_PTS (buffer);
521 meta.dts = GST_BUFFER_DTS (buffer);
522 meta.duration = GST_BUFFER_DURATION (buffer);
523 meta.offset = GST_BUFFER_OFFSET (buffer);
524 meta.offset_end = GST_BUFFER_OFFSET_END (buffer);
525 meta.flags = GST_BUFFER_FLAGS (buffer);
526
527 /* work out meta size */
528 gst_buffer_foreach_meta (buffer, build_meta, &repr);
529
530 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
531 goto write_failed;
532 if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
533 goto write_failed;
534 size =
535 gst_buffer_get_size (buffer) + sizeof (guint32) +
536 sizeof (CommBufferMetadata) + repr.total_bytes;
537 if (!gst_byte_writer_put_uint32_le (&bw, size))
538 goto write_failed;
539 if (!gst_byte_writer_put_data (&bw, (const guint8 *) &meta, sizeof (meta)))
540 goto write_failed;
541 size = gst_buffer_get_size (buffer);
542 if (!gst_byte_writer_put_uint32_le (&bw, size))
543 goto write_failed;
544 if (!write_byte_writer_to_fd (comm, &bw))
545 goto write_failed;
546
547 if (!gst_buffer_map (buffer, &map, GST_MAP_READ))
548 goto map_failed;
549 ret = write_to_fd_raw (comm, map.data, map.size);
550 gst_buffer_unmap (buffer, &map);
551 if (!ret)
552 goto write_failed;
553
554 /* meta */
555 gst_byte_writer_init (&bw);
556 if (!gst_byte_writer_put_uint32_le (&bw, repr.n_meta))
557 goto write_failed;
558 for (n = 0; n < repr.n_meta; ++n) {
559 const MetaBuildInfo *info = repr.info + n;
560 guint32 len;
561 const char *s;
562
563 if (!gst_byte_writer_put_uint32_le (&bw, info->bytes))
564 goto write_failed;
565
566 if (!gst_byte_writer_put_uint32_le (&bw, info->flags))
567 goto write_failed;
568
569 s = g_type_name (info->api);
570 len = strlen (s) + 1;
571 if (!gst_byte_writer_put_uint32_le (&bw, len))
572 goto write_failed;
573 if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len))
574 goto write_failed;
575
576 if (!gst_byte_writer_put_uint64_le (&bw, info->size))
577 goto write_failed;
578
579 s = info->str;
580 len = s ? (strlen (s) + 1) : 0;
581 if (!gst_byte_writer_put_uint32_le (&bw, len))
582 goto write_failed;
583 if (len)
584 if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len))
585 goto write_failed;
586 }
587
588 if (!write_byte_writer_to_fd (comm, &bw))
589 goto write_failed;
590
591 if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
592 ACK_TYPE_BLOCKING, COMM_REQUEST_TYPE_BUFFER))
593 goto wait_failed;
594 ret = ret32;
595
596 done:
597 g_mutex_unlock (&comm->mutex);
598 gst_byte_writer_reset (&bw);
599 for (n = 0; n < repr.n_meta; ++n)
600 g_free (repr.info[n].str);
601 g_free (repr.info);
602 return ret;
603
604 write_failed:
605 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
606 ("Failed to write to socket"));
607 ret = GST_FLOW_COMM_ERROR;
608 goto done;
609
610 wait_failed:
611 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
612 ("Failed to wait for reply on socket"));
613 ret = GST_FLOW_COMM_ERROR;
614 goto done;
615
616 map_failed:
617 GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL),
618 ("Failed to map buffer"));
619 ret = GST_FLOW_ERROR;
620 goto done;
621 }
622
623 static GstBuffer *
gst_ipc_pipeline_comm_read_buffer(GstIpcPipelineComm * comm,guint32 size)624 gst_ipc_pipeline_comm_read_buffer (GstIpcPipelineComm * comm, guint32 size)
625 {
626 GstBuffer *buffer;
627 CommBufferMetadata meta;
628 guint32 n_meta, n;
629 const guint8 *payload = NULL;
630 guint32 mapped_size, buffer_data_size;
631
632 /* this should not be called if we don't have enough yet */
633 g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
634 g_return_val_if_fail (size >= sizeof (CommBufferMetadata), NULL);
635
636 mapped_size = sizeof (CommBufferMetadata) + sizeof (buffer_data_size);
637 payload = gst_adapter_map (comm->adapter, mapped_size);
638 if (!payload)
639 return NULL;
640 memcpy (&meta, payload, sizeof (CommBufferMetadata));
641 payload += sizeof (CommBufferMetadata);
642 memcpy (&buffer_data_size, payload, sizeof (buffer_data_size));
643 size -= mapped_size;
644 gst_adapter_unmap (comm->adapter);
645 gst_adapter_flush (comm->adapter, mapped_size);
646
647 if (buffer_data_size == 0) {
648 buffer = gst_buffer_new ();
649 } else {
650 buffer = gst_adapter_get_buffer (comm->adapter, buffer_data_size);
651 gst_adapter_flush (comm->adapter, buffer_data_size);
652 }
653 size -= buffer_data_size;
654
655 GST_BUFFER_PTS (buffer) = meta.pts;
656 GST_BUFFER_DTS (buffer) = meta.dts;
657 GST_BUFFER_DURATION (buffer) = meta.duration;
658 GST_BUFFER_OFFSET (buffer) = meta.offset;
659 GST_BUFFER_OFFSET_END (buffer) = meta.offset_end;
660 GST_BUFFER_FLAGS (buffer) = meta.flags;
661
662 /* If you don't call that, the GType isn't yet known at the
663 g_type_from_name below */
664 gst_protection_meta_get_info ();
665
666 mapped_size = size;
667 payload = gst_adapter_map (comm->adapter, mapped_size);
668 if (!payload) {
669 gst_buffer_unref (buffer);
670 return NULL;
671 }
672 memcpy (&n_meta, payload, sizeof (n_meta));
673 payload += sizeof (n_meta);
674
675 for (n = 0; n < n_meta; ++n) {
676 guint32 flags, len, bytes;
677 guint64 msize;
678 GType api;
679 GstMeta *meta;
680 GstStructure *structure = NULL;
681
682 memcpy (&bytes, payload, sizeof (bytes));
683 payload += sizeof (bytes);
684
685 #define READ_FIELD(f) do { \
686 memcpy (&f, payload, sizeof (f)); \
687 payload += sizeof(f); \
688 } while(0)
689
690 READ_FIELD (flags);
691 READ_FIELD (len);
692 api = g_type_from_name ((const char *) payload);
693 payload = (const guint8 *) strchr ((const char *) payload, 0) + 1;
694 READ_FIELD (msize);
695 READ_FIELD (len);
696 if (len) {
697 structure = gst_structure_new_from_string ((const char *) payload);
698 payload += len + 1;
699 }
700
701 /* Seems we can add a meta from the api nor type ? */
702 if (api == GST_PROTECTION_META_API_TYPE) {
703 meta =
704 gst_buffer_add_meta (buffer, gst_protection_meta_get_info (), NULL);
705 ((GstProtectionMeta *) meta)->info = structure;
706 } else {
707 GST_WARNING_OBJECT (comm->element, "Unsupported meta: %s",
708 g_type_name (api));
709 if (structure)
710 gst_structure_free (structure);
711 }
712
713 #undef READ_FIELD
714
715 }
716
717 gst_adapter_unmap (comm->adapter);
718 gst_adapter_flush (comm->adapter, mapped_size);
719
720 return buffer;
721 }
722
723 static gboolean
gst_ipc_pipeline_comm_write_sink_message_event_to_fd(GstIpcPipelineComm * comm,GstEvent * event)724 gst_ipc_pipeline_comm_write_sink_message_event_to_fd (GstIpcPipelineComm * comm,
725 GstEvent * event)
726 {
727 const unsigned char payload_type =
728 GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT;
729 gboolean ret;
730 guint32 type, size, eseqnum, mseqnum, ret32 = TRUE, slen, structure_slen;
731 char *str = NULL;
732 const GstStructure *structure;
733 GstMessage *message = NULL;
734 const char *name;
735 GstByteWriter bw;
736
737 g_return_val_if_fail (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE,
738 FALSE);
739
740 g_mutex_lock (&comm->mutex);
741 ++comm->send_id;
742
743 GST_TRACE_OBJECT (comm->element,
744 "Writing sink message event %u: %" GST_PTR_FORMAT, comm->send_id, event);
745
746 gst_byte_writer_init (&bw);
747 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
748 goto write_failed;
749 if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
750 goto write_failed;
751 name = gst_structure_get_name (gst_event_get_structure (event));
752 slen = strlen (name) + 1;
753 gst_event_parse_sink_message (event, &message);
754 structure = gst_message_get_structure (message);
755 if (structure) {
756 str = gst_structure_to_string (structure);
757 structure_slen = strlen (str);
758 } else {
759 str = NULL;
760 structure_slen = 0;
761 }
762 size = sizeof (type) + sizeof (eseqnum) + sizeof (mseqnum) + sizeof (slen) +
763 strlen (name) + 1 + structure_slen + 1;
764 if (!gst_byte_writer_put_uint32_le (&bw, size))
765 goto write_failed;
766
767 type = GST_MESSAGE_TYPE (message);
768 if (!gst_byte_writer_put_uint32_le (&bw, type))
769 goto write_failed;
770 size -= sizeof (type);
771
772 eseqnum = GST_EVENT_SEQNUM (event);
773 if (!gst_byte_writer_put_uint32_le (&bw, eseqnum))
774 goto write_failed;
775 size -= sizeof (eseqnum);
776
777 mseqnum = GST_MESSAGE_SEQNUM (message);
778 if (!gst_byte_writer_put_uint32_le (&bw, mseqnum))
779 goto write_failed;
780 size -= sizeof (mseqnum);
781
782 if (!gst_byte_writer_put_uint32_le (&bw, slen))
783 goto write_failed;
784 size -= sizeof (slen);
785
786 if (!gst_byte_writer_put_data (&bw, (const guint8 *) name, slen))
787 goto write_failed;
788 size -= slen;
789
790 if (str) {
791 if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size))
792 goto write_failed;
793 } else {
794 if (!gst_byte_writer_put_uint8 (&bw, 0))
795 goto write_failed;
796 }
797
798 if (!write_byte_writer_to_fd (comm, &bw))
799 goto write_failed;
800
801 if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
802 GST_EVENT_IS_SERIALIZED (event) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED,
803 COMM_REQUEST_TYPE_EVENT))
804 goto write_failed;
805
806 ret = ret32;
807
808 done:
809 g_mutex_unlock (&comm->mutex);
810 gst_byte_writer_reset (&bw);
811 g_free (str);
812 if (message)
813 gst_message_unref (message);
814 return ret;
815
816 write_failed:
817 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
818 ("Failed to write to socket"));
819 ret = FALSE;
820 goto done;
821 }
822
823 static GstEvent *
gst_ipc_pipeline_comm_read_sink_message_event(GstIpcPipelineComm * comm,guint32 size)824 gst_ipc_pipeline_comm_read_sink_message_event (GstIpcPipelineComm * comm,
825 guint32 size)
826 {
827 GstMessage *message;
828 GstEvent *event = NULL;
829 gchar *end = NULL;
830 GstStructure *structure;
831 guint32 type, eseqnum, mseqnum, slen;
832 const char *name;
833 guint32 mapped_size = size;
834 const guint8 *payload;
835
836 /* this should not be called if we don't have enough yet */
837 g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
838 g_return_val_if_fail (size >= sizeof (type) + sizeof (slen), NULL);
839
840 payload = gst_adapter_map (comm->adapter, mapped_size);
841 if (!payload)
842 return NULL;
843 memcpy (&type, payload, sizeof (type));
844 payload += sizeof (type);
845 size -= sizeof (type);
846 if (size == 0)
847 goto done;
848
849 memcpy (&eseqnum, payload, sizeof (eseqnum));
850 payload += sizeof (eseqnum);
851 size -= sizeof (eseqnum);
852 if (size == 0)
853 goto done;
854
855 memcpy (&mseqnum, payload, sizeof (mseqnum));
856 payload += sizeof (mseqnum);
857 size -= sizeof (mseqnum);
858 if (size == 0)
859 goto done;
860
861 memcpy (&slen, payload, sizeof (slen));
862 payload += sizeof (slen);
863 size -= sizeof (slen);
864 if (size == 0)
865 goto done;
866
867 if (payload[slen - 1])
868 goto done;
869 name = (const char *) payload;
870 payload += slen;
871 size -= slen;
872
873 if ((payload)[size - 1]) {
874 goto done;
875 }
876 if (*payload) {
877 structure = gst_structure_from_string ((const char *) payload, &end);
878 } else {
879 structure = NULL;
880 }
881
882 message =
883 gst_message_new_custom (type, GST_OBJECT (comm->element), structure);
884 gst_message_set_seqnum (message, mseqnum);
885 event = gst_event_new_sink_message (name, message);
886 gst_event_set_seqnum (event, eseqnum);
887 gst_message_unref (message);
888
889 done:
890 gst_adapter_unmap (comm->adapter);
891 gst_adapter_flush (comm->adapter, mapped_size);
892 return event;
893 }
894
895 gboolean
gst_ipc_pipeline_comm_write_event_to_fd(GstIpcPipelineComm * comm,gboolean upstream,GstEvent * event)896 gst_ipc_pipeline_comm_write_event_to_fd (GstIpcPipelineComm * comm,
897 gboolean upstream, GstEvent * event)
898 {
899 const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT;
900 gboolean ret;
901 guint32 type, size, ret32 = TRUE, seqnum, slen;
902 char *str = NULL;
903 const GstStructure *structure;
904 GstByteWriter bw;
905
906 /* we special case sink-message event as gst can't serialize/de-serialize it */
907 if (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE)
908 return gst_ipc_pipeline_comm_write_sink_message_event_to_fd (comm, event);
909
910 g_mutex_lock (&comm->mutex);
911 ++comm->send_id;
912
913 GST_TRACE_OBJECT (comm->element, "Writing event %u: %" GST_PTR_FORMAT,
914 comm->send_id, event);
915
916 gst_byte_writer_init (&bw);
917 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
918 goto write_failed;
919 if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
920 goto write_failed;
921 structure = gst_event_get_structure (event);
922 if (structure) {
923
924 if (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START) {
925 GstStructure *s = gst_structure_copy (structure);
926 gst_structure_remove_field (s, "stream");
927 str = gst_structure_to_string (s);
928 gst_structure_free (s);
929 } else {
930 str = gst_structure_to_string (structure);
931 }
932
933 slen = strlen (str);
934 } else {
935 str = NULL;
936 slen = 0;
937 }
938 size = sizeof (type) + sizeof (seqnum) + 1 + slen + 1;
939 if (!gst_byte_writer_put_uint32_le (&bw, size))
940 goto write_failed;
941
942 type = GST_EVENT_TYPE (event);
943 if (!gst_byte_writer_put_uint32_le (&bw, type))
944 goto write_failed;
945
946 seqnum = GST_EVENT_SEQNUM (event);
947 if (!gst_byte_writer_put_uint32_le (&bw, seqnum))
948 goto write_failed;
949
950 if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0))
951 goto write_failed;
952
953 if (str) {
954 if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1))
955 goto write_failed;
956 } else {
957 if (!gst_byte_writer_put_uint8 (&bw, 0))
958 goto write_failed;
959 }
960
961 if (!write_byte_writer_to_fd (comm, &bw))
962 goto write_failed;
963
964 /* Upstream events get serialized, this is required to send seeks only
965 * one at a time. */
966 if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
967 (GST_EVENT_IS_SERIALIZED (event) || GST_EVENT_IS_UPSTREAM (event)) ?
968 ACK_TYPE_BLOCKING : ACK_TYPE_NONE, COMM_REQUEST_TYPE_EVENT))
969 goto write_failed;
970 ret = ret32;
971
972 done:
973 g_mutex_unlock (&comm->mutex);
974 g_free (str);
975 gst_byte_writer_reset (&bw);
976 return ret;
977
978 write_failed:
979 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
980 ("Failed to write to socket"));
981 ret = FALSE;
982 goto done;
983 }
984
985 static GstEvent *
gst_ipc_pipeline_comm_read_event(GstIpcPipelineComm * comm,guint32 size,gboolean * upstream)986 gst_ipc_pipeline_comm_read_event (GstIpcPipelineComm * comm, guint32 size,
987 gboolean * upstream)
988 {
989 GstEvent *event = NULL;
990 gchar *end = NULL;
991 GstStructure *structure;
992 guint32 type, seqnum;
993 guint32 mapped_size = size;
994 const guint8 *payload;
995
996 /* this should not be called if we don't have enough yet */
997 g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
998 g_return_val_if_fail (size >= sizeof (type), NULL);
999
1000 payload = gst_adapter_map (comm->adapter, mapped_size);
1001 if (!payload)
1002 return NULL;
1003
1004 memcpy (&type, payload, sizeof (type));
1005 payload += sizeof (type);
1006 size -= sizeof (type);
1007 if (size == 0)
1008 goto done;
1009
1010 memcpy (&seqnum, payload, sizeof (seqnum));
1011 payload += sizeof (seqnum);
1012 size -= sizeof (seqnum);
1013 if (size == 0)
1014 goto done;
1015
1016 *upstream = (*payload) ? TRUE : FALSE;
1017 payload += 1;
1018 size -= 1;
1019 if (size == 0)
1020 goto done;
1021
1022 if (payload[size - 1])
1023 goto done;
1024 if (*payload) {
1025 structure = gst_structure_from_string ((const char *) payload, &end);
1026 } else {
1027 structure = NULL;
1028 }
1029
1030 event = gst_event_new_custom (type, structure);
1031 gst_event_set_seqnum (event, seqnum);
1032
1033 done:
1034 gst_adapter_unmap (comm->adapter);
1035 gst_adapter_flush (comm->adapter, mapped_size);
1036 return event;
1037 }
1038
1039 gboolean
gst_ipc_pipeline_comm_write_query_to_fd(GstIpcPipelineComm * comm,gboolean upstream,GstQuery * query)1040 gst_ipc_pipeline_comm_write_query_to_fd (GstIpcPipelineComm * comm,
1041 gboolean upstream, GstQuery * query)
1042 {
1043 const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY;
1044 gboolean ret;
1045 guint32 type, size, ret32 = TRUE, slen;
1046 char *str = NULL;
1047 const GstStructure *structure;
1048 GstByteWriter bw;
1049
1050 g_mutex_lock (&comm->mutex);
1051 ++comm->send_id;
1052
1053 GST_TRACE_OBJECT (comm->element, "Writing query %u: %" GST_PTR_FORMAT,
1054 comm->send_id, query);
1055
1056 gst_byte_writer_init (&bw);
1057 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1058 goto write_failed;
1059 if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1060 goto write_failed;
1061 structure = gst_query_get_structure (query);
1062 if (structure) {
1063 str = gst_structure_to_string (structure);
1064 slen = strlen (str);
1065 } else {
1066 str = NULL;
1067 slen = 0;
1068 }
1069 size = sizeof (type) + 1 + slen + 1;
1070 if (!gst_byte_writer_put_uint32_le (&bw, size))
1071 goto write_failed;
1072
1073 type = GST_QUERY_TYPE (query);
1074 if (!gst_byte_writer_put_uint32_le (&bw, type))
1075 goto write_failed;
1076
1077 if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0))
1078 goto write_failed;
1079
1080 if (str) {
1081 if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1))
1082 goto write_failed;
1083 } else {
1084 if (!gst_byte_writer_put_uint8 (&bw, 0))
1085 goto write_failed;
1086 }
1087
1088 if (!write_byte_writer_to_fd (comm, &bw))
1089 goto write_failed;
1090
1091 if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, query, &ret32,
1092 GST_QUERY_IS_SERIALIZED (query) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED,
1093 COMM_REQUEST_TYPE_QUERY))
1094 goto write_failed;
1095
1096 ret = ret32;
1097
1098 done:
1099 g_mutex_unlock (&comm->mutex);
1100 g_free (str);
1101 gst_byte_writer_reset (&bw);
1102 return ret;
1103
1104 write_failed:
1105 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1106 ("Failed to write to socket"));
1107 ret = FALSE;
1108 goto done;
1109 }
1110
1111 static GstQuery *
gst_ipc_pipeline_comm_read_query(GstIpcPipelineComm * comm,guint32 size,gboolean * upstream)1112 gst_ipc_pipeline_comm_read_query (GstIpcPipelineComm * comm, guint32 size,
1113 gboolean * upstream)
1114 {
1115 GstQuery *query = NULL;
1116 gchar *end = NULL;
1117 GstStructure *structure;
1118 guint32 type;
1119 guint32 mapped_size = size;
1120 const guint8 *payload;
1121
1122 /* this should not be called if we don't have enough yet */
1123 g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
1124 g_return_val_if_fail (size >= sizeof (type), NULL);
1125
1126 payload = gst_adapter_map (comm->adapter, mapped_size);
1127 if (!payload)
1128 return NULL;
1129
1130 memcpy (&type, payload, sizeof (type));
1131 payload += sizeof (type);
1132 size -= sizeof (type);
1133 if (size == 0)
1134 goto done;
1135
1136 *upstream = (*payload) ? TRUE : FALSE;
1137 payload += 1;
1138 size -= 1;
1139 if (size == 0)
1140 goto done;
1141
1142 if (payload[size - 1])
1143 goto done;
1144 if (*payload) {
1145 structure = gst_structure_from_string ((const char *) payload, &end);
1146 } else {
1147 structure = NULL;
1148 }
1149
1150 query = gst_query_new_custom (type, structure);
1151
1152 /* CAPS queries contain a filter field, of GstCaps type, which can be NULL.
1153 This does not play well with the serialization/deserialization system,
1154 which will give us a non-NULL GstCaps which has a value of NULL. This
1155 in turn wreaks havoc with any code that tests whether filter is NULL
1156 (which basically means, am I being given an optional GstCaps ?).
1157 So we look for non-NULL GstCaps which have NULL contents, and replace
1158 them with NULL instead. */
1159 if (GST_QUERY_TYPE (query) == GST_QUERY_CAPS) {
1160 GstCaps *filter;
1161 gst_query_parse_caps (query, &filter);
1162 if (filter
1163 && !strcmp (gst_structure_get_name (gst_caps_get_structure (filter, 0)),
1164 "NULL")) {
1165 gst_query_unref (query);
1166 query = gst_query_new_caps (NULL);
1167 }
1168 }
1169
1170 done:
1171 gst_adapter_unmap (comm->adapter);
1172 gst_adapter_flush (comm->adapter, mapped_size);
1173 return query;
1174 }
1175
1176 GstStateChangeReturn
gst_ipc_pipeline_comm_write_state_change_to_fd(GstIpcPipelineComm * comm,GstStateChange transition)1177 gst_ipc_pipeline_comm_write_state_change_to_fd (GstIpcPipelineComm * comm,
1178 GstStateChange transition)
1179 {
1180 const unsigned char payload_type =
1181 GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE;
1182 GstStateChangeReturn ret;
1183 guint32 size, ret32 = GST_STATE_CHANGE_SUCCESS;
1184 GstByteWriter bw;
1185
1186 g_mutex_lock (&comm->mutex);
1187 ++comm->send_id;
1188
1189 GST_TRACE_OBJECT (comm->element, "Writing state change %u: %s -> %s",
1190 comm->send_id,
1191 gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
1192 gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
1193
1194 gst_byte_writer_init (&bw);
1195 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1196 goto write_failed;
1197 if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1198 goto write_failed;
1199 size = sizeof (transition);
1200 if (!gst_byte_writer_put_uint32_le (&bw, size))
1201 goto write_failed;
1202 if (!gst_byte_writer_put_uint32_le (&bw, transition))
1203 goto write_failed;
1204
1205 if (!write_byte_writer_to_fd (comm, &bw))
1206 goto write_failed;
1207
1208 if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
1209 ACK_TYPE_TIMED, COMM_REQUEST_TYPE_STATE_CHANGE))
1210 goto write_failed;
1211 ret = ret32;
1212
1213 done:
1214 g_mutex_unlock (&comm->mutex);
1215 gst_byte_writer_reset (&bw);
1216 return ret;
1217
1218 write_failed:
1219 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1220 ("Failed to write to socket"));
1221 ret = GST_STATE_CHANGE_FAILURE;
1222 goto done;
1223 }
1224
1225 static gboolean
is_valid_state_change(GstStateChange transition)1226 is_valid_state_change (GstStateChange transition)
1227 {
1228 if (transition == GST_STATE_CHANGE_NULL_TO_READY)
1229 return TRUE;
1230 if (transition == GST_STATE_CHANGE_READY_TO_PAUSED)
1231 return TRUE;
1232 if (transition == GST_STATE_CHANGE_PAUSED_TO_PLAYING)
1233 return TRUE;
1234 if (transition == GST_STATE_CHANGE_PLAYING_TO_PAUSED)
1235 return TRUE;
1236 if (transition == GST_STATE_CHANGE_PAUSED_TO_READY)
1237 return TRUE;
1238 if (transition == GST_STATE_CHANGE_READY_TO_NULL)
1239 return TRUE;
1240 if (GST_STATE_TRANSITION_CURRENT (transition) ==
1241 GST_STATE_TRANSITION_NEXT (transition))
1242 return TRUE;
1243 return FALSE;
1244 }
1245
1246 static gboolean
gst_ipc_pipeline_comm_read_state_change(GstIpcPipelineComm * comm,guint32 size,guint32 * transition)1247 gst_ipc_pipeline_comm_read_state_change (GstIpcPipelineComm * comm,
1248 guint32 size, guint32 * transition)
1249 {
1250 guint32 mapped_size = size;
1251 const guint8 *payload;
1252
1253 /* this should not be called if we don't have enough yet */
1254 g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE);
1255 g_return_val_if_fail (size >= sizeof (*transition), FALSE);
1256
1257 payload = gst_adapter_map (comm->adapter, size);
1258 if (!payload)
1259 return FALSE;
1260 memcpy (transition, payload, sizeof (*transition));
1261 gst_adapter_unmap (comm->adapter);
1262 gst_adapter_flush (comm->adapter, mapped_size);
1263 return is_valid_state_change (*transition);
1264 }
1265
1266 void
gst_ipc_pipeline_comm_write_state_lost_to_fd(GstIpcPipelineComm * comm)1267 gst_ipc_pipeline_comm_write_state_lost_to_fd (GstIpcPipelineComm * comm)
1268 {
1269 const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST;
1270 guint32 size;
1271 GstByteWriter bw;
1272
1273 g_mutex_lock (&comm->mutex);
1274 ++comm->send_id;
1275
1276 GST_TRACE_OBJECT (comm->element, "Writing state-lost %u", comm->send_id);
1277 gst_byte_writer_init (&bw);
1278 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1279 goto write_failed;
1280 if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1281 goto write_failed;
1282 size = 0;
1283 if (!gst_byte_writer_put_uint32_le (&bw, size))
1284 goto write_failed;
1285
1286 if (!write_byte_writer_to_fd (comm, &bw))
1287 goto write_failed;
1288
1289 done:
1290 g_mutex_unlock (&comm->mutex);
1291 gst_byte_writer_reset (&bw);
1292 return;
1293
1294 write_failed:
1295 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1296 ("Failed to write to socket"));
1297 goto done;
1298 }
1299
1300 static gboolean
gst_ipc_pipeline_comm_read_state_lost(GstIpcPipelineComm * comm,guint32 size)1301 gst_ipc_pipeline_comm_read_state_lost (GstIpcPipelineComm * comm, guint32 size)
1302 {
1303 /* no payload */
1304 return TRUE;
1305 }
1306
1307 static gboolean
gst_ipc_pipeline_comm_write_gerror_message_to_fd(GstIpcPipelineComm * comm,GstMessage * message)1308 gst_ipc_pipeline_comm_write_gerror_message_to_fd (GstIpcPipelineComm * comm,
1309 GstMessage * message)
1310 {
1311 const unsigned char payload_type =
1312 GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE;
1313 gboolean ret;
1314 guint32 code, size, ret32 = TRUE;
1315 char *str = NULL;
1316 GError *error;
1317 char *extra_message;
1318 const char *domain_string;
1319 unsigned char msgtype;
1320 GstByteWriter bw;
1321
1322 g_mutex_lock (&comm->mutex);
1323 ++comm->send_id;
1324
1325 if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR) {
1326 gst_message_parse_error (message, &error, &extra_message);
1327 msgtype = 2;
1328 } else if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING) {
1329 gst_message_parse_warning (message, &error, &extra_message);
1330 msgtype = 1;
1331 } else {
1332 gst_message_parse_info (message, &error, &extra_message);
1333 msgtype = 0;
1334 }
1335 code = error->code;
1336 domain_string = g_quark_to_string (error->domain);
1337 GST_TRACE_OBJECT (comm->element,
1338 "Writing error %u: domain %s, code %u, message %s, extra message %s",
1339 comm->send_id, domain_string, error->code, error->message, extra_message);
1340
1341 gst_byte_writer_init (&bw);
1342 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1343 goto write_failed;
1344 if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1345 goto write_failed;
1346
1347 size = sizeof (size);
1348 size += 1;
1349 size += strlen (domain_string) + 1;
1350 size += sizeof (code);
1351 size += sizeof (size);
1352 size += error->message ? strlen (error->message) + 1 : 0;
1353 size += sizeof (size);
1354 size += extra_message ? strlen (extra_message) + 1 : 0;
1355
1356 if (!gst_byte_writer_put_uint32_le (&bw, size))
1357 goto write_failed;
1358
1359 if (!gst_byte_writer_put_uint8 (&bw, msgtype))
1360 goto write_failed;
1361 size = strlen (domain_string) + 1;
1362 if (!gst_byte_writer_put_uint32_le (&bw, size))
1363 goto write_failed;
1364 if (!gst_byte_writer_put_data (&bw, (const guint8 *) domain_string, size))
1365 goto write_failed;
1366 if (!gst_byte_writer_put_uint32_le (&bw, code))
1367 goto write_failed;
1368 size = error->message ? strlen (error->message) + 1 : 0;
1369 if (!gst_byte_writer_put_uint32_le (&bw, size))
1370 goto write_failed;
1371 if (error->message) {
1372 if (!gst_byte_writer_put_data (&bw, (const guint8 *) error->message, size))
1373 goto write_failed;
1374 }
1375 size = extra_message ? strlen (extra_message) + 1 : 0;
1376 if (!gst_byte_writer_put_uint32_le (&bw, size))
1377 goto write_failed;
1378 if (extra_message) {
1379 if (!gst_byte_writer_put_data (&bw, (const guint8 *) extra_message, size))
1380 goto write_failed;
1381 }
1382
1383 if (!write_byte_writer_to_fd (comm, &bw))
1384 goto write_failed;
1385
1386 if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
1387 ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE))
1388 goto write_failed;
1389
1390 ret = ret32;
1391
1392 done:
1393 g_mutex_unlock (&comm->mutex);
1394 g_free (str);
1395 g_error_free (error);
1396 g_free (extra_message);
1397 gst_byte_writer_reset (&bw);
1398 return ret;
1399
1400 write_failed:
1401 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1402 ("Failed to write to socket"));
1403 ret = FALSE;
1404 goto done;
1405 }
1406
1407 static GstMessage *
gst_ipc_pipeline_comm_read_gerror_message(GstIpcPipelineComm * comm,guint32 size)1408 gst_ipc_pipeline_comm_read_gerror_message (GstIpcPipelineComm * comm,
1409 guint32 size)
1410 {
1411 GstMessage *message = NULL;
1412 guint32 code;
1413 GQuark domain;
1414 const char *msg, *extra_message;
1415 GError *error;
1416 unsigned char msgtype;
1417 guint32 mapped_size = size;
1418 const guint8 *payload;
1419
1420 /* this should not be called if we don't have enough yet */
1421 g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
1422 g_return_val_if_fail (size >= sizeof (code) + sizeof (size) * 3 + 1 + 1,
1423 NULL);
1424
1425 payload = gst_adapter_map (comm->adapter, mapped_size);
1426 if (!payload)
1427 return NULL;
1428 msgtype = *payload++;
1429 memcpy (&size, payload, sizeof (size));
1430 payload += sizeof (size);
1431 if (payload[size - 1])
1432 goto done;
1433 domain = g_quark_from_string ((const char *) payload);
1434 payload += size;
1435
1436 memcpy (&code, payload, sizeof (code));
1437 payload += sizeof (code);
1438
1439 memcpy (&size, payload, sizeof (size));
1440 payload += sizeof (size);
1441 if (size) {
1442 if (payload[size - 1])
1443 goto done;
1444 msg = (const char *) payload;
1445 } else {
1446 msg = NULL;
1447 }
1448 payload += size;
1449
1450 memcpy (&size, payload, sizeof (size));
1451 payload += sizeof (size);
1452 if (size) {
1453 if (payload[size - 1])
1454 goto done;
1455 extra_message = (const char *) payload;
1456 } else {
1457 extra_message = NULL;
1458 }
1459 payload += size;
1460
1461 error = g_error_new (domain, code, "%s", msg);
1462 if (msgtype == 2)
1463 message =
1464 gst_message_new_error (GST_OBJECT (comm->element), error,
1465 extra_message);
1466 else if (msgtype == 1)
1467 message =
1468 gst_message_new_warning (GST_OBJECT (comm->element), error,
1469 extra_message);
1470 else
1471 message =
1472 gst_message_new_info (GST_OBJECT (comm->element), error, extra_message);
1473 g_error_free (error);
1474
1475 done:
1476 gst_adapter_unmap (comm->adapter);
1477 gst_adapter_flush (comm->adapter, mapped_size);
1478
1479 return message;
1480 }
1481
1482 gboolean
gst_ipc_pipeline_comm_write_message_to_fd(GstIpcPipelineComm * comm,GstMessage * message)1483 gst_ipc_pipeline_comm_write_message_to_fd (GstIpcPipelineComm * comm,
1484 GstMessage * message)
1485 {
1486 const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE;
1487 gboolean ret;
1488 guint32 type, size, ret32 = TRUE, slen;
1489 char *str = NULL;
1490 const GstStructure *structure;
1491 GstByteWriter bw;
1492
1493 /* we special case error as gst can't serialize/de-serialize it */
1494 if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR
1495 || GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING
1496 || GST_MESSAGE_TYPE (message) == GST_MESSAGE_INFO)
1497 return gst_ipc_pipeline_comm_write_gerror_message_to_fd (comm, message);
1498
1499 g_mutex_lock (&comm->mutex);
1500 ++comm->send_id;
1501
1502 GST_TRACE_OBJECT (comm->element, "Writing message %u: %" GST_PTR_FORMAT,
1503 comm->send_id, message);
1504
1505 gst_byte_writer_init (&bw);
1506 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1507 goto write_failed;
1508 if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1509 goto write_failed;
1510 structure = gst_message_get_structure (message);
1511 if (structure) {
1512 str = gst_structure_to_string (structure);
1513 slen = strlen (str);
1514 } else {
1515 str = NULL;
1516 slen = 0;
1517 }
1518 size = sizeof (type) + slen + 1;
1519 if (!gst_byte_writer_put_uint32_le (&bw, size))
1520 goto write_failed;
1521
1522 type = GST_MESSAGE_TYPE (message);
1523 if (!gst_byte_writer_put_uint32_le (&bw, type))
1524 goto write_failed;
1525 size -= sizeof (type);
1526 if (str) {
1527 if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size))
1528 goto write_failed;
1529 } else {
1530 if (!gst_byte_writer_put_uint8 (&bw, 0))
1531 goto write_failed;
1532 }
1533
1534 if (!write_byte_writer_to_fd (comm, &bw))
1535 goto write_failed;
1536
1537 if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
1538 ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE))
1539 goto write_failed;
1540
1541 ret = ret32;
1542
1543 done:
1544 g_mutex_unlock (&comm->mutex);
1545 g_free (str);
1546 gst_byte_writer_reset (&bw);
1547 return ret;
1548
1549 write_failed:
1550 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1551 ("Failed to write to socket"));
1552 ret = FALSE;
1553 goto done;
1554 }
1555
1556 static GstMessage *
gst_ipc_pipeline_comm_read_message(GstIpcPipelineComm * comm,guint32 size)1557 gst_ipc_pipeline_comm_read_message (GstIpcPipelineComm * comm, guint32 size)
1558 {
1559 GstMessage *message = NULL;
1560 gchar *end = NULL;
1561 GstStructure *structure;
1562 guint32 type;
1563 guint32 mapped_size = size;
1564 const guint8 *payload;
1565
1566 /* this should not be called if we don't have enough yet */
1567 g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
1568 g_return_val_if_fail (size >= sizeof (type), NULL);
1569
1570 payload = gst_adapter_map (comm->adapter, mapped_size);
1571 if (!payload)
1572 return NULL;
1573 memcpy (&type, payload, sizeof (type));
1574 payload += sizeof (type);
1575 size -= sizeof (type);
1576 if (size == 0)
1577 goto done;
1578
1579 if (payload[size - 1])
1580 goto done;
1581 if (*payload) {
1582 structure = gst_structure_from_string ((const char *) payload, &end);
1583 } else {
1584 structure = NULL;
1585 }
1586
1587 message =
1588 gst_message_new_custom (type, GST_OBJECT (comm->element), structure);
1589
1590 done:
1591 gst_adapter_unmap (comm->adapter);
1592 gst_adapter_flush (comm->adapter, mapped_size);
1593
1594 return message;
1595 }
1596
1597 void
gst_ipc_pipeline_comm_init(GstIpcPipelineComm * comm,GstElement * element)1598 gst_ipc_pipeline_comm_init (GstIpcPipelineComm * comm, GstElement * element)
1599 {
1600 g_mutex_init (&comm->mutex);
1601 comm->element = element;
1602 comm->fdin = comm->fdout = -1;
1603 comm->ack_time = DEFAULT_ACK_TIME;
1604 comm->waiting_ids =
1605 g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
1606 (GDestroyNotify) comm_request_free);
1607 comm->adapter = gst_adapter_new ();
1608 comm->poll = gst_poll_new (TRUE);
1609 gst_poll_fd_init (&comm->pollFDin);
1610 }
1611
1612 void
gst_ipc_pipeline_comm_clear(GstIpcPipelineComm * comm)1613 gst_ipc_pipeline_comm_clear (GstIpcPipelineComm * comm)
1614 {
1615 g_hash_table_destroy (comm->waiting_ids);
1616 gst_object_unref (comm->adapter);
1617 gst_poll_free (comm->poll);
1618 g_mutex_clear (&comm->mutex);
1619 }
1620
1621 static void
cancel_request(gpointer key,gpointer value,gpointer user_data,GstFlowReturn fret)1622 cancel_request (gpointer key, gpointer value, gpointer user_data,
1623 GstFlowReturn fret)
1624 {
1625 GstIpcPipelineComm *comm = (GstIpcPipelineComm *) user_data;
1626 guint32 id = GPOINTER_TO_INT (key);
1627 CommRequest *req = (CommRequest *) value;
1628
1629 GST_TRACE_OBJECT (comm->element, "Cancelling request %u, type %d", id,
1630 req->type);
1631 req->ret = fret;
1632 req->replied = TRUE;
1633 g_cond_signal (&req->cond);
1634 }
1635
1636 static void
cancel_request_error(gpointer key,gpointer value,gpointer user_data)1637 cancel_request_error (gpointer key, gpointer value, gpointer user_data)
1638 {
1639 CommRequest *req = (CommRequest *) value;
1640 GstFlowReturn fret = comm_request_ret_get_failure_value (req->type);
1641
1642 cancel_request (key, value, user_data, fret);
1643 }
1644
1645 void
gst_ipc_pipeline_comm_cancel(GstIpcPipelineComm * comm,gboolean cleanup)1646 gst_ipc_pipeline_comm_cancel (GstIpcPipelineComm * comm, gboolean cleanup)
1647 {
1648 g_mutex_lock (&comm->mutex);
1649 g_hash_table_foreach (comm->waiting_ids, cancel_request_error, comm);
1650 if (cleanup) {
1651 g_hash_table_unref (comm->waiting_ids);
1652 comm->waiting_ids =
1653 g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
1654 (GDestroyNotify) comm_request_free);
1655 }
1656 g_mutex_unlock (&comm->mutex);
1657 }
1658
1659 static gboolean
set_field(GQuark field_id,const GValue * value,gpointer user_data)1660 set_field (GQuark field_id, const GValue * value, gpointer user_data)
1661 {
1662 GstStructure *structure = user_data;
1663
1664 gst_structure_id_set_value (structure, field_id, value);
1665
1666 return TRUE;
1667 }
1668
1669 static gboolean
gst_ipc_pipeline_comm_reply_request(GstIpcPipelineComm * comm,guint32 id,GstFlowReturn ret,GstQuery * query)1670 gst_ipc_pipeline_comm_reply_request (GstIpcPipelineComm * comm, guint32 id,
1671 GstFlowReturn ret, GstQuery * query)
1672 {
1673 CommRequest *req;
1674
1675 req = g_hash_table_lookup (comm->waiting_ids, GINT_TO_POINTER (id));
1676 if (!req) {
1677 GST_WARNING_OBJECT (comm->element, "Got reply for unknown request %u", id);
1678 return FALSE;
1679 }
1680
1681 GST_TRACE_OBJECT (comm->element, "Got reply %d (%s) for request %u", ret,
1682 comm_request_ret_get_name (req->type, ret), req->id);
1683 req->replied = TRUE;
1684 req->ret = ret;
1685 if (query) {
1686 if (req->query) {
1687 /* We need to update the original query in place, as the caller
1688 will expect the object to be the same */
1689 GstStructure *structure = gst_query_writable_structure (req->query);
1690 gst_structure_remove_all_fields (structure);
1691 gst_structure_foreach (gst_query_get_structure (query), set_field,
1692 structure);
1693 } else {
1694 GST_WARNING_OBJECT (comm->element,
1695 "Got query reply, but no query was in the request");
1696 }
1697 }
1698 g_cond_signal (&req->cond);
1699 return TRUE;
1700 }
1701
1702 static gint
update_adapter(GstIpcPipelineComm * comm)1703 update_adapter (GstIpcPipelineComm * comm)
1704 {
1705 GstMemory *mem = NULL;
1706 GstBuffer *buf;
1707 GstMapInfo map;
1708 ssize_t sz;
1709 gint ret = 0;
1710
1711 again:
1712 /* update pollFDin if necessary (fdin changed or we lost our parent).
1713 * we do not allow a parent-less element to communicate with its peer
1714 * in order to avoid race conditions where the slave tries to change
1715 * the state of its parent pipeline while it is not yet added in that
1716 * pipeline. */
1717 if (comm->pollFDin.fd != comm->fdin || !GST_OBJECT_PARENT (comm->element)) {
1718 if (comm->pollFDin.fd != -1) {
1719 GST_DEBUG_OBJECT (comm->element, "Stop watching fd %d",
1720 comm->pollFDin.fd);
1721 gst_poll_remove_fd (comm->poll, &comm->pollFDin);
1722 gst_poll_fd_init (&comm->pollFDin);
1723 }
1724 if (comm->fdin != -1 && GST_OBJECT_PARENT (comm->element)) {
1725 GST_DEBUG_OBJECT (comm->element, "Start watching fd %d", comm->fdin);
1726 comm->pollFDin.fd = comm->fdin;
1727 gst_poll_add_fd (comm->poll, &comm->pollFDin);
1728 gst_poll_fd_ctl_read (comm->poll, &comm->pollFDin, TRUE);
1729 }
1730 }
1731
1732 /* wait for activity on fdin or a flush */
1733 if (gst_poll_wait (comm->poll, 100 * GST_MSECOND) < 0) {
1734 if (errno == EAGAIN)
1735 goto again;
1736 /* error out, unless interrupted or flushing */
1737 if (errno != EINTR)
1738 ret = (errno == EBUSY) ? 2 : 1;
1739 }
1740
1741 /* read from fdin if possible and push data to our adapter */
1742 if (comm->pollFDin.fd >= 0
1743 && gst_poll_fd_can_read (comm->poll, &comm->pollFDin)) {
1744 if (!mem)
1745 mem = gst_allocator_alloc (NULL, comm->read_chunk_size, NULL);
1746
1747 gst_memory_map (mem, &map, GST_MAP_WRITE);
1748 sz = read (comm->pollFDin.fd, map.data, map.size);
1749 gst_memory_unmap (mem, &map);
1750
1751 if (sz <= 0) {
1752 if (errno == EAGAIN)
1753 goto again;
1754 /* error out, unless interrupted */
1755 if (errno != EINTR)
1756 ret = 1;
1757 } else {
1758 gst_memory_resize (mem, 0, sz);
1759 buf = gst_buffer_new ();
1760 gst_buffer_append_memory (buf, mem);
1761 mem = NULL;
1762 GST_TRACE_OBJECT (comm->element, "Read %u bytes from fd", (unsigned) sz);
1763 gst_adapter_push (comm->adapter, buf);
1764 }
1765 }
1766
1767 if (mem)
1768 gst_memory_unref (mem);
1769
1770 return ret;
1771 }
1772
1773 static gboolean
read_many(GstIpcPipelineComm * comm)1774 read_many (GstIpcPipelineComm * comm)
1775 {
1776 gboolean ret = TRUE;
1777 gsize available;
1778 const guint8 *payload;
1779
1780 while (1)
1781 switch (comm->state) {
1782 case GST_IPC_PIPELINE_COMM_STATE_TYPE:
1783 {
1784 guint8 type;
1785 guint32 mapped_size;
1786
1787 available = gst_adapter_available (comm->adapter);
1788 mapped_size = 1 + sizeof (gint32) * 2;
1789 if (available < mapped_size)
1790 goto done;
1791
1792 payload = gst_adapter_map (comm->adapter, mapped_size);
1793 type = *payload++;
1794 g_mutex_lock (&comm->mutex);
1795 memcpy (&comm->id, payload, sizeof (guint32));
1796 memcpy (&comm->payload_length, payload + 4, sizeof (guint32));
1797 g_mutex_unlock (&comm->mutex);
1798 gst_adapter_unmap (comm->adapter);
1799 gst_adapter_flush (comm->adapter, mapped_size);
1800 GST_TRACE_OBJECT (comm->element, "Got id %u, type %d, payload %u",
1801 comm->id, type, comm->payload_length);
1802 switch (type) {
1803 case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
1804 case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
1805 case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
1806 case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
1807 case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
1808 case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
1809 case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
1810 case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
1811 case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
1812 case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
1813 GST_TRACE_OBJECT (comm->element, "switching to state %s",
1814 gst_ipc_pipeline_comm_data_type_get_name (type));
1815 comm->state = type;
1816 break;
1817 default:
1818 goto out_of_sync;
1819 }
1820 break;
1821 }
1822 case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
1823 {
1824 const guint8 *rets;
1825 guint32 ret32;
1826
1827 available = gst_adapter_available (comm->adapter);
1828 if (available < comm->payload_length)
1829 goto done;
1830
1831 if (available < sizeof (guint32))
1832 goto ack_failed;
1833
1834 rets = gst_adapter_map (comm->adapter, sizeof (guint32));
1835 memcpy (&ret32, rets, sizeof (ret32));
1836 gst_adapter_unmap (comm->adapter);
1837 gst_adapter_flush (comm->adapter, sizeof (guint32));
1838 GST_TRACE_OBJECT (comm->element, "Got ACK %s for id %u",
1839 gst_flow_get_name (ret32), comm->id);
1840
1841 g_mutex_lock (&comm->mutex);
1842 gst_ipc_pipeline_comm_reply_request (comm, comm->id, ret32, NULL);
1843 g_mutex_unlock (&comm->mutex);
1844
1845 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1846 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1847 break;
1848 }
1849 case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
1850 {
1851 GstQuery *query = NULL;
1852 gboolean qret;
1853
1854 available = gst_adapter_available (comm->adapter);
1855 if (available < comm->payload_length)
1856 goto done;
1857
1858 qret =
1859 gst_ipc_pipeline_comm_read_query_result (comm, comm->payload_length,
1860 &query);
1861
1862 GST_TRACE_OBJECT (comm->element,
1863 "deserialized query result %p: %d, %" GST_PTR_FORMAT, query, qret,
1864 query);
1865
1866 g_mutex_lock (&comm->mutex);
1867 gst_ipc_pipeline_comm_reply_request (comm, comm->id, qret, query);
1868 g_mutex_unlock (&comm->mutex);
1869
1870 gst_query_unref (query);
1871
1872 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1873 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1874 break;
1875 }
1876 case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
1877 {
1878 GstBuffer *buf;
1879
1880 available = gst_adapter_available (comm->adapter);
1881 if (available < comm->payload_length)
1882 goto done;
1883
1884 buf = gst_ipc_pipeline_comm_read_buffer (comm, comm->payload_length);
1885 if (!buf)
1886 goto buffer_failed;
1887
1888 /* set caps and push */
1889 GST_TRACE_OBJECT (comm->element,
1890 "deserialized buffer %p, pushing, timestamp %" GST_TIME_FORMAT
1891 ", duration %" GST_TIME_FORMAT ", offset %" G_GINT64_FORMAT
1892 ", offset_end %" G_GINT64_FORMAT ", size %" G_GSIZE_FORMAT
1893 ", flags 0x%x", buf, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
1894 GST_TIME_ARGS (GST_BUFFER_DURATION (buf)), GST_BUFFER_OFFSET (buf),
1895 GST_BUFFER_OFFSET_END (buf), gst_buffer_get_size (buf),
1896 GST_BUFFER_FLAGS (buf));
1897
1898 gst_mini_object_set_qdata (GST_MINI_OBJECT (buf), QUARK_ID,
1899 GINT_TO_POINTER (comm->id), NULL);
1900
1901 if (comm->on_buffer)
1902 (*comm->on_buffer) (comm->id, buf, comm->user_data);
1903
1904 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1905 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1906 break;
1907 }
1908 case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
1909 {
1910 GstEvent *event;
1911 gboolean upstream;
1912
1913 available = gst_adapter_available (comm->adapter);
1914 if (available < comm->payload_length)
1915 goto done;
1916
1917 event = gst_ipc_pipeline_comm_read_event (comm, comm->payload_length,
1918 &upstream);
1919 if (!event)
1920 goto event_failed;
1921
1922 GST_TRACE_OBJECT (comm->element, "deserialized event %p of type %s",
1923 event, gst_event_type_get_name (event->type));
1924
1925 gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID,
1926 GINT_TO_POINTER (comm->id), NULL);
1927
1928 if (comm->on_event)
1929 (*comm->on_event) (comm->id, event, upstream, comm->user_data);
1930
1931 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1932 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1933 break;
1934 }
1935 case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
1936 {
1937 GstEvent *event;
1938
1939 available = gst_adapter_available (comm->adapter);
1940 if (available < comm->payload_length)
1941 goto done;
1942
1943 event = gst_ipc_pipeline_comm_read_sink_message_event (comm,
1944 comm->payload_length);
1945 if (!event)
1946 goto event_failed;
1947
1948 GST_TRACE_OBJECT (comm->element, "deserialized sink message event %p",
1949 event);
1950
1951 gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID,
1952 GINT_TO_POINTER (comm->id), NULL);
1953
1954 if (comm->on_event)
1955 (*comm->on_event) (comm->id, event, FALSE, comm->user_data);
1956
1957 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1958 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1959 break;
1960 }
1961 case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
1962 {
1963 GstQuery *query;
1964 gboolean upstream;
1965
1966 available = gst_adapter_available (comm->adapter);
1967 if (available < comm->payload_length)
1968 goto done;
1969
1970 query = gst_ipc_pipeline_comm_read_query (comm, comm->payload_length,
1971 &upstream);
1972 if (!query)
1973 goto query_failed;
1974
1975 GST_TRACE_OBJECT (comm->element, "deserialized query %p of type %s",
1976 query, gst_query_type_get_name (query->type));
1977
1978 gst_mini_object_set_qdata (GST_MINI_OBJECT (query), QUARK_ID,
1979 GINT_TO_POINTER (comm->id), NULL);
1980
1981 if (comm->on_query)
1982 (*comm->on_query) (comm->id, query, upstream, comm->user_data);
1983
1984 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1985 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1986 break;
1987 }
1988 case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
1989 {
1990 guint32 transition;
1991
1992 available = gst_adapter_available (comm->adapter);
1993 if (available < comm->payload_length)
1994 goto done;
1995
1996 if (!gst_ipc_pipeline_comm_read_state_change (comm,
1997 comm->payload_length, &transition))
1998 goto state_change_failed;
1999
2000 GST_TRACE_OBJECT (comm->element,
2001 "deserialized state change request: %s -> %s",
2002 gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT
2003 (transition)),
2004 gst_element_state_get_name (GST_STATE_TRANSITION_NEXT
2005 (transition)));
2006
2007 if (comm->on_state_change)
2008 (*comm->on_state_change) (comm->id, transition, comm->user_data);
2009
2010 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2011 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2012 break;
2013 }
2014 case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
2015 {
2016 available = gst_adapter_available (comm->adapter);
2017 if (available < comm->payload_length)
2018 goto done;
2019
2020 if (!gst_ipc_pipeline_comm_read_state_lost (comm, comm->payload_length))
2021 goto event_failed;
2022
2023 GST_TRACE_OBJECT (comm->element, "deserialized state-lost");
2024
2025 if (comm->on_state_lost)
2026 (*comm->on_state_lost) (comm->user_data);
2027
2028 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2029 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2030 break;
2031 }
2032 case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
2033 {
2034 GstMessage *message;
2035
2036 available = gst_adapter_available (comm->adapter);
2037 if (available < comm->payload_length)
2038 goto done;
2039
2040 message = gst_ipc_pipeline_comm_read_message (comm,
2041 comm->payload_length);
2042 if (!message)
2043 goto message_failed;
2044
2045 GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s",
2046 message, gst_message_type_get_name (message->type));
2047
2048 if (comm->on_message)
2049 (*comm->on_message) (comm->id, message, comm->user_data);
2050
2051 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2052 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2053 break;
2054 }
2055 case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
2056 {
2057 GstMessage *message;
2058
2059 available = gst_adapter_available (comm->adapter);
2060 if (available < comm->payload_length)
2061 goto done;
2062
2063 message = gst_ipc_pipeline_comm_read_gerror_message (comm,
2064 comm->payload_length);
2065 if (!message)
2066 goto message_failed;
2067
2068 GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s",
2069 message, gst_message_type_get_name (message->type));
2070
2071 if (comm->on_message)
2072 (*comm->on_message) (comm->id, message, comm->user_data);
2073
2074 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2075 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2076 break;
2077 }
2078 }
2079
2080 done:
2081 return ret;
2082
2083 /* ERRORS */
2084 out_of_sync:
2085 {
2086 GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2087 ("Socket out of sync"));
2088 ret = FALSE;
2089 goto done;
2090 }
2091 state_change_failed:
2092 {
2093 GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2094 ("could not read state change from fd"));
2095 ret = FALSE;
2096 goto done;
2097 }
2098 ack_failed:
2099 {
2100 GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2101 ("could not read ack from fd"));
2102 ret = FALSE;
2103 goto done;
2104 }
2105 buffer_failed:
2106 {
2107 GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2108 ("could not read buffer from fd"));
2109 ret = FALSE;
2110 goto done;
2111 }
2112 event_failed:
2113 {
2114 GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2115 ("could not read event from fd"));
2116 ret = FALSE;
2117 goto done;
2118 }
2119 message_failed:
2120 {
2121 GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2122 ("could not read message from fd"));
2123 ret = FALSE;
2124 goto done;
2125 }
2126 query_failed:
2127 {
2128 GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2129 ("could not read query from fd"));
2130 ret = FALSE;
2131 goto done;
2132 }
2133 }
2134
2135 static gpointer
reader_thread(gpointer data)2136 reader_thread (gpointer data)
2137 {
2138 GstIpcPipelineComm *comm = (GstIpcPipelineComm *) data;
2139 gboolean running = TRUE;
2140 gint ret = 0;
2141
2142 while (running) {
2143 ret = update_adapter (comm);
2144 switch (ret) {
2145 case 1:
2146 GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL),
2147 ("Failed to read from socket"));
2148 running = FALSE;
2149 break;
2150 case 2:
2151 GST_INFO_OBJECT (comm->element, "We're stopping, all good");
2152 running = FALSE;
2153 break;
2154 default:
2155 read_many (comm);
2156 break;
2157 }
2158 }
2159
2160 GST_INFO_OBJECT (comm->element, "Reader thread ending");
2161 return NULL;
2162 }
2163
2164 gboolean
gst_ipc_pipeline_comm_start_reader_thread(GstIpcPipelineComm * comm,void (* on_buffer)(guint32,GstBuffer *,gpointer),void (* on_event)(guint32,GstEvent *,gboolean,gpointer),void (* on_query)(guint32,GstQuery *,gboolean,gpointer),void (* on_state_change)(guint32,GstStateChange,gpointer),void (* on_state_lost)(gpointer),void (* on_message)(guint32,GstMessage *,gpointer),gpointer user_data)2165 gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm,
2166 void (*on_buffer) (guint32, GstBuffer *, gpointer),
2167 void (*on_event) (guint32, GstEvent *, gboolean, gpointer),
2168 void (*on_query) (guint32, GstQuery *, gboolean, gpointer),
2169 void (*on_state_change) (guint32, GstStateChange, gpointer),
2170 void (*on_state_lost) (gpointer),
2171 void (*on_message) (guint32, GstMessage *, gpointer), gpointer user_data)
2172 {
2173 if (comm->reader_thread)
2174 return FALSE;
2175
2176 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2177 comm->on_buffer = on_buffer;
2178 comm->on_event = on_event;
2179 comm->on_query = on_query;
2180 comm->on_state_change = on_state_change;
2181 comm->on_state_lost = on_state_lost;
2182 comm->on_message = on_message;
2183 comm->user_data = user_data;
2184 gst_poll_set_flushing (comm->poll, FALSE);
2185 comm->reader_thread =
2186 g_thread_new ("reader", (GThreadFunc) reader_thread, comm);
2187 return TRUE;
2188 }
2189
2190 void
gst_ipc_pipeline_comm_stop_reader_thread(GstIpcPipelineComm * comm)2191 gst_ipc_pipeline_comm_stop_reader_thread (GstIpcPipelineComm * comm)
2192 {
2193 if (!comm->reader_thread)
2194 return;
2195
2196 gst_poll_set_flushing (comm->poll, TRUE);
2197 g_thread_join (comm->reader_thread);
2198 comm->reader_thread = NULL;
2199 }
2200
2201 static gchar *
gst_value_serialize_event(const GValue * value)2202 gst_value_serialize_event (const GValue * value)
2203 {
2204 const GstStructure *structure;
2205 GstEvent *ev;
2206 gchar *type, *ts, *seqnum, *rt_offset, *str, *str64, *s;
2207 GValue val = G_VALUE_INIT;
2208
2209 ev = g_value_get_boxed (value);
2210
2211 g_value_init (&val, gst_event_type_get_type ());
2212 g_value_set_enum (&val, ev->type);
2213 type = gst_value_serialize (&val);
2214 g_value_unset (&val);
2215
2216 g_value_init (&val, G_TYPE_UINT64);
2217 g_value_set_uint64 (&val, ev->timestamp);
2218 ts = gst_value_serialize (&val);
2219 g_value_unset (&val);
2220
2221 g_value_init (&val, G_TYPE_UINT);
2222 g_value_set_uint (&val, ev->seqnum);
2223 seqnum = gst_value_serialize (&val);
2224 g_value_unset (&val);
2225
2226 g_value_init (&val, G_TYPE_INT64);
2227 g_value_set_int64 (&val, gst_event_get_running_time_offset (ev));
2228 rt_offset = gst_value_serialize (&val);
2229 g_value_unset (&val);
2230
2231 structure = gst_event_get_structure (ev);
2232 str = gst_structure_to_string (structure);
2233 str64 = g_base64_encode ((guchar *) str, strlen (str) + 1);
2234 g_strdelimit (str64, "=", '_');
2235 g_free (str);
2236
2237 s = g_strconcat (type, ":", ts, ":", seqnum, ":", rt_offset, ":", str64,
2238 NULL);
2239
2240 g_free (type);
2241 g_free (ts);
2242 g_free (seqnum);
2243 g_free (rt_offset);
2244 g_free (str64);
2245
2246 return s;
2247 }
2248
2249 static gboolean
gst_value_deserialize_event(GValue * dest,const gchar * s)2250 gst_value_deserialize_event (GValue * dest, const gchar * s)
2251 {
2252 GstEvent *ev = NULL;
2253 GValue val = G_VALUE_INIT;
2254 gboolean ret = FALSE;
2255 gchar **fields;
2256 gsize len;
2257
2258 fields = g_strsplit (s, ":", -1);
2259 if (g_strv_length (fields) != 5)
2260 goto wrong_length;
2261
2262 g_strdelimit (fields[4], "_", '=');
2263 g_base64_decode_inplace (fields[4], &len);
2264
2265 g_value_init (&val, gst_event_type_get_type ());
2266 if (!gst_value_deserialize (&val, fields[0]))
2267 goto fail;
2268 ev = gst_event_new_custom (g_value_get_enum (&val),
2269 gst_structure_new_from_string (fields[4]));
2270
2271 g_value_unset (&val);
2272 g_value_init (&val, G_TYPE_UINT64);
2273 if (!gst_value_deserialize (&val, fields[1]))
2274 goto fail;
2275 ev->timestamp = g_value_get_uint64 (&val);
2276
2277 g_value_unset (&val);
2278 g_value_init (&val, G_TYPE_UINT);
2279 if (!gst_value_deserialize (&val, fields[2]))
2280 goto fail;
2281 ev->seqnum = g_value_get_uint (&val);
2282
2283 g_value_unset (&val);
2284 g_value_init (&val, G_TYPE_INT64);
2285 if (!gst_value_deserialize (&val, fields[3]))
2286 goto fail;
2287 gst_event_set_running_time_offset (ev, g_value_get_int64 (&val));
2288
2289 g_value_take_boxed (dest, ev);
2290 ev = NULL;
2291 ret = TRUE;
2292
2293 fail:
2294 g_clear_pointer (&ev, gst_event_unref);
2295 g_value_unset (&val);
2296
2297 wrong_length:
2298 g_strfreev (fields);
2299 return ret;
2300 }
2301
2302 #define REGISTER_SERIALIZATION_NO_COMPARE(_gtype, _type) \
2303 G_STMT_START { \
2304 static GstValueTable gst_value = \
2305 { 0, NULL, \
2306 gst_value_serialize_ ## _type, gst_value_deserialize_ ## _type }; \
2307 gst_value.type = _gtype; \
2308 gst_value_register (&gst_value); \
2309 } G_STMT_END
2310
2311 void
gst_ipc_pipeline_comm_plugin_init(void)2312 gst_ipc_pipeline_comm_plugin_init (void)
2313 {
2314 static volatile gsize once = 0;
2315
2316 if (g_once_init_enter (&once)) {
2317 GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_comm_debug, "ipcpipelinecomm", 0,
2318 "ipc pipeline comm");
2319 QUARK_ID = g_quark_from_static_string ("ipcpipeline-id");
2320 REGISTER_SERIALIZATION_NO_COMPARE (gst_event_get_type (), event);
2321 g_once_init_leave (&once, (gsize) 1);
2322 }
2323 }
2324