1 /*
2 * Farstream - Farstream Shared Memory Stream Transmitter
3 *
4 * Copyright 2009 Collabora Ltd.
5 * @author: Olivier Crete <olivier.crete@collabora.co.uk>
6 * Copyright 2009 Nokia Corp.
7 *
8 * fs-shm-stream-transmitter.c - A Farstream Shared memory stream transmitter
9 *
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License as published by the Free Software Foundation; either
13 * version 2.1 of the License, or (at your option) any later version.
14 *
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Lesser General Public License for more details.
19 *
20 * You should have received a copy of the GNU Lesser General Public
21 * License along with this library; if not, write to the Free Software
22 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25
26 /**
27 * SECTION:fs-shm-stream-transmitter
28 * @short_description: A stream transmitter object for Shared Memory
29 *
30 * The name of this transmitter is "shm".
31 *
32 * This transmitter is meant to send and received the data from another process
33 * on the same system while minimizing the memory pressure associated with the
34 * use of sockets.
35 *
36 * Two sockets are used to control the shared memory areas. One is used to
37 * send data and one to receive data. The receiver always connects to the
38 * sender. The sender socket must exist before the receiver connects to it.
39 *
40 * Negotiating the paths of the sockets can happen in two ways. If the
41 * create-local-candidates is True then the transmitter will generate the
42 * path of the local candidate and us it as the ip filed in #FsCandidate. The
43 * transmitter will expect the path of the applications sender socket to be in
44 * the "ip" field of the remote candidates #FsCandidate as well.
45 *
46 * Or alternatively, if create-local-candidates is false then
47 * the sender socket can be created by giving the transmitter a candidate
48 * with the path of the socket in the "ip" field of the #FsCandidate. This
49 * #FsCandidate can be given to the #FsStreamTransmitter in two ways, either
50 * by setting the #FsStreamTransmitter:preferred-local-candidates property
51 * or by calling the fs_stream_transmitter_force_remote_candidates() function.
52 * There can be only one single send socket per stream. When the send socket
53 * is ready to be connected to, #FsStreamTransmitter::new-local-candidate signal
54 * will be emitted.
55 *
56 * To connect the receive side to the other application, one must create a
57 * #FsCandidate with the path of the sender's socket in the "username" field.
58 * If the receiver can not connect to the sender,
59 * the fs_stream_transmitter_force_remote_candidates() call will fail.
60 */
61
62 #ifdef HAVE_CONFIG_H
63 #include "config.h"
64 #endif
65
66 #include "fs-shm-stream-transmitter.h"
67 #include "fs-shm-transmitter.h"
68
69 #include <farstream/fs-candidate.h>
70 #include <farstream/fs-conference.h>
71
72 #include <gst/gst.h>
73
74 #include <glib/gstdio.h>
75
76 #include <string.h>
77 #include <sys/types.h>
78
79 #ifdef HAVE_UNISTD_H
80 # include <unistd.h>
81 #endif
82
83 #include <stdlib.h>
84
85 GST_DEBUG_CATEGORY_EXTERN (fs_shm_transmitter_debug);
86 #define GST_CAT_DEFAULT fs_shm_transmitter_debug
87
88 /* Signals */
89 enum
90 {
91 LAST_SIGNAL
92 };
93
94 /* props */
95 enum
96 {
97 PROP_0,
98 PROP_SENDING,
99 PROP_PREFERRED_LOCAL_CANDIDATES,
100 PROP_CREATE_LOCAL_CANDIDATES,
101 };
102
103 struct _FsShmStreamTransmitterPrivate
104 {
105 /* We don't actually hold a ref to this,
106 * But since our parent FsStream can not exist without its parent
107 * FsSession, we should be safe
108 */
109 FsShmTransmitter *transmitter;
110
111 GList *preferred_local_candidates;
112
113 GMutex mutex;
114
115 /* Protected by the mutex */
116 gboolean sending;
117
118 /* Protected by the mutex */
119 FsCandidate **candidates;
120
121 /* Whether we create the local candidate ourselves or rely on the remote end
122 * to pass them to us as part of the candidate */
123 gboolean create_local_candidates;
124
125 /* temporary socket directy in case we made one */
126 gchar *socket_dir;
127
128 ShmSrc **shm_src;
129 ShmSink **shm_sink;
130 };
131
132 #define FS_SHM_STREAM_TRANSMITTER_GET_PRIVATE(o) \
133 (G_TYPE_INSTANCE_GET_PRIVATE ((o), FS_TYPE_SHM_STREAM_TRANSMITTER, \
134 FsShmStreamTransmitterPrivate))
135
136 #define FS_SHM_STREAM_TRANSMITTER_LOCK(s) \
137 g_mutex_lock (&(s)->priv->mutex)
138 #define FS_SHM_STREAM_TRANSMITTER_UNLOCK(s) \
139 g_mutex_unlock (&(s)->priv->mutex)
140
141 static void fs_shm_stream_transmitter_class_init (FsShmStreamTransmitterClass *klass);
142 static void fs_shm_stream_transmitter_init (FsShmStreamTransmitter *self);
143 static void fs_shm_stream_transmitter_dispose (GObject *object);
144 static void fs_shm_stream_transmitter_finalize (GObject *object);
145
146 static void fs_shm_stream_transmitter_get_property (GObject *object,
147 guint prop_id,
148 GValue *value,
149 GParamSpec *pspec);
150 static void fs_shm_stream_transmitter_set_property (GObject *object,
151 guint prop_id,
152 const GValue *value,
153 GParamSpec *pspec);
154
155 static gboolean fs_shm_stream_transmitter_force_remote_candidates (
156 FsStreamTransmitter *streamtransmitter, GList *candidates,
157 GError **error);
158 static gboolean fs_shm_stream_transmitter_gather_local_candidates (
159 FsStreamTransmitter *streamtransmitter,
160 GError **error);
161
162 static gboolean
163 fs_shm_stream_transmitter_add_sink (FsShmStreamTransmitter *self,
164 FsCandidate *candidate, GError **error);
165
166
167 static GObjectClass *parent_class = NULL;
168 // static guint signals[LAST_SIGNAL] = { 0 };
169
170 static GType type = 0;
171
172 GType
fs_shm_stream_transmitter_get_type(void)173 fs_shm_stream_transmitter_get_type (void)
174 {
175 return type;
176 }
177
178 GType
fs_shm_stream_transmitter_register_type(FsPlugin * module G_GNUC_UNUSED)179 fs_shm_stream_transmitter_register_type (FsPlugin *module G_GNUC_UNUSED)
180 {
181 static const GTypeInfo info = {
182 sizeof (FsShmStreamTransmitterClass),
183 NULL,
184 NULL,
185 (GClassInitFunc) fs_shm_stream_transmitter_class_init,
186 NULL,
187 NULL,
188 sizeof (FsShmStreamTransmitter),
189 0,
190 (GInstanceInitFunc) fs_shm_stream_transmitter_init
191 };
192
193 type = g_type_register_static (FS_TYPE_STREAM_TRANSMITTER,
194 "FsShmStreamTransmitter", &info, 0);
195
196 return type;
197 }
198
199 static void
fs_shm_stream_transmitter_class_init(FsShmStreamTransmitterClass * klass)200 fs_shm_stream_transmitter_class_init (FsShmStreamTransmitterClass *klass)
201 {
202 GObjectClass *gobject_class = (GObjectClass *) klass;
203 FsStreamTransmitterClass *streamtransmitterclass =
204 FS_STREAM_TRANSMITTER_CLASS (klass);
205 GParamSpec *pspec;
206
207 parent_class = g_type_class_peek_parent (klass);
208
209 gobject_class->set_property = fs_shm_stream_transmitter_set_property;
210 gobject_class->get_property = fs_shm_stream_transmitter_get_property;
211
212 streamtransmitterclass->force_remote_candidates =
213 fs_shm_stream_transmitter_force_remote_candidates;
214 streamtransmitterclass->gather_local_candidates =
215 fs_shm_stream_transmitter_gather_local_candidates;
216
217 g_object_class_override_property (gobject_class, PROP_SENDING, "sending");
218 g_object_class_override_property (gobject_class,
219 PROP_PREFERRED_LOCAL_CANDIDATES, "preferred-local-candidates");
220
221 pspec = g_param_spec_boolean ("create-local-candidates",
222 "CreateLocalCandidates",
223 "Whether the transmitter should automatically create local candidates",
224 FALSE,
225 G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS);
226 g_object_class_install_property (gobject_class,
227 PROP_CREATE_LOCAL_CANDIDATES,
228 pspec);
229
230
231 gobject_class->dispose = fs_shm_stream_transmitter_dispose;
232 gobject_class->finalize = fs_shm_stream_transmitter_finalize;
233
234 g_type_class_add_private (klass, sizeof (FsShmStreamTransmitterPrivate));
235 }
236
237 static void
fs_shm_stream_transmitter_init(FsShmStreamTransmitter * self)238 fs_shm_stream_transmitter_init (FsShmStreamTransmitter *self)
239 {
240 /* member init */
241 self->priv = FS_SHM_STREAM_TRANSMITTER_GET_PRIVATE (self);
242
243 self->priv->sending = TRUE;
244
245 g_mutex_init (&self->priv->mutex);
246 }
247
248 static void
fs_shm_stream_transmitter_dispose(GObject * object)249 fs_shm_stream_transmitter_dispose (GObject *object)
250 {
251 FsShmStreamTransmitter *self = FS_SHM_STREAM_TRANSMITTER (object);
252 gint c; /* component_id */
253
254 for (c = 1; c <= self->priv->transmitter->components; c++)
255 {
256 if (self->priv->shm_src[c])
257 {
258 fs_shm_transmitter_check_shm_src (self->priv->transmitter,
259 self->priv->shm_src[c], NULL);
260 }
261 self->priv->shm_src[c] = NULL;
262
263 if (self->priv->shm_sink[c])
264 {
265 fs_shm_transmitter_check_shm_sink (self->priv->transmitter,
266 self->priv->shm_sink[c], NULL);
267 }
268 self->priv->shm_sink[c] = NULL;
269 }
270
271 if (self->priv->socket_dir != NULL)
272 g_rmdir (self->priv->socket_dir);
273 g_free (self->priv->socket_dir);
274 self->priv->socket_dir = NULL;
275
276 parent_class->dispose (object);
277 }
278
279 static void
fs_shm_stream_transmitter_finalize(GObject * object)280 fs_shm_stream_transmitter_finalize (GObject *object)
281 {
282 FsShmStreamTransmitter *self = FS_SHM_STREAM_TRANSMITTER (object);
283
284 fs_candidate_list_destroy (self->priv->preferred_local_candidates);
285
286 g_free (self->priv->shm_src);
287 g_free (self->priv->shm_sink);
288 g_mutex_clear (&self->priv->mutex);
289
290 parent_class->finalize (object);
291 }
292
293 static void
fs_shm_stream_transmitter_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)294 fs_shm_stream_transmitter_get_property (GObject *object,
295 guint prop_id,
296 GValue *value,
297 GParamSpec *pspec)
298 {
299 FsShmStreamTransmitter *self = FS_SHM_STREAM_TRANSMITTER (object);
300
301 switch (prop_id)
302 {
303 case PROP_SENDING:
304 FS_SHM_STREAM_TRANSMITTER_LOCK (self);
305 g_value_set_boolean (value, self->priv->sending);
306 FS_SHM_STREAM_TRANSMITTER_UNLOCK (self);
307 break;
308 case PROP_PREFERRED_LOCAL_CANDIDATES:
309 g_value_set_boxed (value, self->priv->preferred_local_candidates);
310 break;
311 case PROP_CREATE_LOCAL_CANDIDATES:
312 g_value_set_boolean (value, self->priv->create_local_candidates);
313 break;
314 default:
315 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
316 break;
317 }
318 }
319
320 static void
fs_shm_stream_transmitter_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)321 fs_shm_stream_transmitter_set_property (GObject *object,
322 guint prop_id,
323 const GValue *value,
324 GParamSpec *pspec)
325 {
326 FsShmStreamTransmitter *self = FS_SHM_STREAM_TRANSMITTER (object);
327
328 switch (prop_id) {
329 case PROP_SENDING:
330 FS_SHM_STREAM_TRANSMITTER_LOCK (self);
331 self->priv->sending = g_value_get_boolean (value);
332 if (self->priv->shm_sink[1])
333 fs_shm_transmitter_sink_set_sending (self->priv->transmitter,
334 self->priv->shm_sink[1], self->priv->sending);
335 FS_SHM_STREAM_TRANSMITTER_UNLOCK (self);
336 break;
337 case PROP_PREFERRED_LOCAL_CANDIDATES:
338 self->priv->preferred_local_candidates = g_value_dup_boxed (value);
339 break;
340 case PROP_CREATE_LOCAL_CANDIDATES:
341 self->priv->create_local_candidates = g_value_get_boolean (value);
342 break;
343 default:
344 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
345 break;
346 }
347 }
348
349 static gboolean
fs_shm_stream_transmitter_build(FsShmStreamTransmitter * self,GError ** error)350 fs_shm_stream_transmitter_build (FsShmStreamTransmitter *self,
351 GError **error)
352 {
353 self->priv->shm_src = g_new0 (ShmSrc *,
354 self->priv->transmitter->components + 1);
355 self->priv->shm_sink = g_new0 (ShmSink *,
356 self->priv->transmitter->components + 1);
357
358 return TRUE;
359 }
360
361 static void
got_buffer_func(GstBuffer * buffer,guint component,gpointer data)362 got_buffer_func (GstBuffer *buffer, guint component, gpointer data)
363 {
364 FsShmStreamTransmitter *self = FS_SHM_STREAM_TRANSMITTER_CAST (data);
365
366 g_signal_emit_by_name (self, "known-source-packet-received", component,
367 buffer);
368 }
369
ready_cb(guint component,gchar * path,gpointer data)370 static void ready_cb (guint component, gchar *path, gpointer data)
371 {
372 FsShmStreamTransmitter *self = FS_SHM_STREAM_TRANSMITTER_CAST (data);
373 FsCandidate *candidate = fs_candidate_new (NULL, component,
374 FS_CANDIDATE_TYPE_HOST, FS_NETWORK_PROTOCOL_UDP, path, 0);
375
376 GST_DEBUG ("Emitting new local candidate with path %s", path);
377
378 g_signal_emit_by_name (self, "new-local-candidate", candidate);
379 g_signal_emit_by_name (self, "local-candidates-prepared");
380
381 fs_candidate_destroy (candidate);
382 }
383
384 static void
connected_cb(guint component,gint id,gpointer data)385 connected_cb (guint component, gint id, gpointer data)
386 {
387 FsShmStreamTransmitter *self = data;
388
389 g_signal_emit_by_name (self, "state-changed", component,
390 FS_STREAM_STATE_READY);
391 }
392
393 static gboolean
fs_shm_stream_transmitter_add_sink(FsShmStreamTransmitter * self,FsCandidate * candidate,GError ** error)394 fs_shm_stream_transmitter_add_sink (FsShmStreamTransmitter *self,
395 FsCandidate *candidate, GError **error)
396 {
397 if (self->priv->create_local_candidates)
398 return TRUE;
399
400 if (!candidate->ip || !candidate->ip[0])
401 return TRUE;
402
403 if (self->priv->shm_sink[candidate->component_id])
404 {
405 if (fs_shm_transmitter_check_shm_sink (self->priv->transmitter,
406 self->priv->shm_sink[candidate->component_id], candidate->ip))
407 return TRUE;
408 self->priv->shm_sink[candidate->component_id] = NULL;
409 }
410
411 self->priv->shm_sink[candidate->component_id] =
412 fs_shm_transmitter_get_shm_sink (self->priv->transmitter,
413 candidate->component_id, candidate->ip, ready_cb, connected_cb,
414 self, error);
415
416 if (self->priv->shm_sink[candidate->component_id] == NULL)
417 return FALSE;
418
419 if (candidate->component_id == 1)
420 fs_shm_transmitter_sink_set_sending (self->priv->transmitter,
421 self->priv->shm_sink[candidate->component_id], self->priv->sending);
422
423 return TRUE;
424 }
425
426
427 static void
disconnected_cb(guint component,gint id,gpointer data)428 disconnected_cb (guint component, gint id, gpointer data)
429 {
430 FsShmStreamTransmitter *self = data;
431
432 g_signal_emit_by_name (self, "state-changed", component,
433 FS_STREAM_STATE_FAILED);
434 }
435
436 static gboolean
fs_shm_stream_transmitter_force_remote_candidate(FsShmStreamTransmitter * self,FsCandidate * candidate,GError ** error)437 fs_shm_stream_transmitter_force_remote_candidate (
438 FsShmStreamTransmitter *self, FsCandidate *candidate,
439 GError **error)
440 {
441 const gchar *path;
442 if (!fs_shm_stream_transmitter_add_sink (self, candidate, error))
443 return FALSE;
444
445 if (self->priv->create_local_candidates)
446 path = candidate->ip;
447 else
448 path = candidate->username;
449
450 if (path && path[0])
451 {
452 if (self->priv->shm_src[candidate->component_id])
453 {
454 if (fs_shm_transmitter_check_shm_src (self->priv->transmitter,
455 self->priv->shm_src[candidate->component_id], path))
456 return TRUE;
457 self->priv->shm_src[candidate->component_id] = NULL;
458 }
459
460 self->priv->shm_src[candidate->component_id] =
461 fs_shm_transmitter_get_shm_src (self->priv->transmitter,
462 candidate->component_id, path, got_buffer_func, disconnected_cb,
463 self, error);
464
465 if (self->priv->shm_src[candidate->component_id] == NULL)
466 return FALSE;
467 }
468
469 return TRUE;
470 }
471
472 /**
473 * fs_shm_stream_transmitter_force_remote_candidates
474 */
475
476 static gboolean
fs_shm_stream_transmitter_force_remote_candidates(FsStreamTransmitter * streamtransmitter,GList * candidates,GError ** error)477 fs_shm_stream_transmitter_force_remote_candidates (
478 FsStreamTransmitter *streamtransmitter, GList *candidates,
479 GError **error)
480 {
481 GList *item = NULL;
482 FsShmStreamTransmitter *self =
483 FS_SHM_STREAM_TRANSMITTER (streamtransmitter);
484
485 for (item = candidates; item; item = g_list_next (item))
486 {
487 FsCandidate *candidate = item->data;
488
489 if (candidate->component_id == 0 ||
490 candidate->component_id > self->priv->transmitter->components) {
491 g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
492 "The candidate passed has an invalid component id %u (not in [1,%u])",
493 candidate->component_id, self->priv->transmitter->components);
494 return FALSE;
495 }
496
497 if ((!candidate->ip || !candidate->ip[0]) &&
498 (!candidate->username || !candidate->username[0]))
499 {
500 g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
501 "The candidate does not have a SINK shm segment in its ip"
502 " or a SRC shm segment in its username");
503 return FALSE;
504 }
505 }
506
507 for (item = candidates; item; item = g_list_next (item))
508 if (!fs_shm_stream_transmitter_force_remote_candidate (self,
509 item->data, error))
510 return FALSE;
511
512
513 return TRUE;
514 }
515
516
517 FsShmStreamTransmitter *
fs_shm_stream_transmitter_newv(FsShmTransmitter * transmitter,guint n_parameters,GParameter * parameters,GError ** error)518 fs_shm_stream_transmitter_newv (FsShmTransmitter *transmitter,
519 guint n_parameters, GParameter *parameters, GError **error)
520 {
521 FsShmStreamTransmitter *streamtransmitter = NULL;
522
523 streamtransmitter = g_object_newv (FS_TYPE_SHM_STREAM_TRANSMITTER,
524 n_parameters, parameters);
525
526 if (!streamtransmitter) {
527 g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
528 "Could not build the stream transmitter");
529 return NULL;
530 }
531
532 streamtransmitter->priv->transmitter = transmitter;
533
534 if (!fs_shm_stream_transmitter_build (streamtransmitter, error)) {
535 g_object_unref (streamtransmitter);
536 return NULL;
537 }
538
539 return streamtransmitter;
540 }
541
542
543 static gboolean
fs_shm_stream_transmitter_gather_local_candidates(FsStreamTransmitter * streamtransmitter,GError ** error)544 fs_shm_stream_transmitter_gather_local_candidates (
545 FsStreamTransmitter *streamtransmitter,
546 GError **error)
547 {
548 FsShmStreamTransmitter *self =
549 FS_SHM_STREAM_TRANSMITTER (streamtransmitter);
550 GList *item;
551
552 if (self->priv->create_local_candidates)
553 {
554 guint c;
555 gchar *socket_dir;
556
557 socket_dir = g_build_filename (g_get_tmp_dir (),
558 "farstream-shm-XXXXXX", NULL);
559
560 if (g_mkdtemp (socket_dir) == NULL)
561 return FALSE;
562
563 self->priv->socket_dir = socket_dir;
564
565 for (c = 1; c <= self->priv->transmitter->components; c++)
566 {
567 gchar *path = g_strdup_printf ("%s/shm-sink-socket-%d", socket_dir, c);
568
569 self->priv->shm_sink[c] =
570 fs_shm_transmitter_get_shm_sink (self->priv->transmitter,
571 c, path, ready_cb, connected_cb, self, error);
572 g_free (path);
573
574 if (self->priv->shm_sink[c] == NULL)
575 return FALSE;
576
577 if (c == 1)
578 fs_shm_transmitter_sink_set_sending (self->priv->transmitter,
579 self->priv->shm_sink[c], self->priv->sending);
580 }
581
582 return TRUE;
583 }
584
585 for (item = self->priv->preferred_local_candidates;
586 item;
587 item = g_list_next (item))
588 {
589 FsCandidate *candidate = item->data;
590
591 if (candidate->ip && candidate->ip[0])
592 if (!fs_shm_stream_transmitter_add_sink (self, candidate, error))
593 return FALSE;
594 }
595
596 return TRUE;
597 }
598