1 /*
2  * Farstream - Farstream Shared Memory Stream Transmitter
3  *
4  * Copyright 2009 Collabora Ltd.
5  *  @author: Olivier Crete <olivier.crete@collabora.co.uk>
6  * Copyright 2009 Nokia Corp.
7  *
8  * fs-shm-stream-transmitter.c - A Farstream Shared memory stream transmitter
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Lesser General Public
12  * License as published by the Free Software Foundation; either
13  * version 2.1 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Lesser General Public License for more details.
19  *
20  * You should have received a copy of the GNU Lesser General Public
21  * License along with this library; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
23  */
24 
25 
26 /**
27  * SECTION:fs-shm-stream-transmitter
28  * @short_description: A stream transmitter object for Shared Memory
29  *
30  * The name of this transmitter is "shm".
31  *
32  * This transmitter is meant to send and received the data from another process
33  * on the same system while minimizing the memory pressure associated with the
34  * use of sockets.
35  *
36  * Two sockets are used to control the shared memory areas. One is used to
37  * send data and one to receive data. The receiver always connects to the
38  * sender. The sender socket must exist before the receiver connects to it.
39  *
40  * Negotiating the paths of the sockets can happen in two ways. If the
41  * create-local-candidates is True then the transmitter will generate the
42  * path of the local candidate and us it as the ip filed in #FsCandidate. The
43  * transmitter will expect the path of the applications sender socket to be in
44  * the "ip" field of the remote candidates #FsCandidate as well.
45  *
46  * Or alternatively, if create-local-candidates is false then
47  * the sender socket can be created by giving the transmitter a candidate
48  * with the path of the socket in the "ip" field of the #FsCandidate. This
49  * #FsCandidate can be given to the #FsStreamTransmitter in two ways, either
50  * by setting the #FsStreamTransmitter:preferred-local-candidates property
51  * or by calling the fs_stream_transmitter_force_remote_candidates() function.
52  * There can be only one single send socket per stream. When the send socket
53  * is ready to be connected to, #FsStreamTransmitter::new-local-candidate signal
54  * will be emitted.
55  *
56  * To connect the receive side to the other application, one must create a
57  * #FsCandidate with the path of the sender's socket in the "username" field.
58  * If the receiver can not connect to the sender,
59  * the fs_stream_transmitter_force_remote_candidates() call will fail.
60  */
61 
62 #ifdef HAVE_CONFIG_H
63 #include "config.h"
64 #endif
65 
66 #include "fs-shm-stream-transmitter.h"
67 #include "fs-shm-transmitter.h"
68 
69 #include <farstream/fs-candidate.h>
70 #include <farstream/fs-conference.h>
71 
72 #include <gst/gst.h>
73 
74 #include <glib/gstdio.h>
75 
76 #include <string.h>
77 #include <sys/types.h>
78 
79 #ifdef HAVE_UNISTD_H
80 # include <unistd.h>
81 #endif
82 
83 #include <stdlib.h>
84 
85 GST_DEBUG_CATEGORY_EXTERN (fs_shm_transmitter_debug);
86 #define GST_CAT_DEFAULT fs_shm_transmitter_debug
87 
88 /* Signals */
89 enum
90 {
91   LAST_SIGNAL
92 };
93 
94 /* props */
95 enum
96 {
97   PROP_0,
98   PROP_SENDING,
99   PROP_PREFERRED_LOCAL_CANDIDATES,
100   PROP_CREATE_LOCAL_CANDIDATES,
101 };
102 
103 struct _FsShmStreamTransmitterPrivate
104 {
105   /* We don't actually hold a ref to this,
106    * But since our parent FsStream can not exist without its parent
107    * FsSession, we should be safe
108    */
109   FsShmTransmitter *transmitter;
110 
111   GList *preferred_local_candidates;
112 
113   GMutex mutex;
114 
115   /* Protected by the mutex */
116   gboolean sending;
117 
118   /* Protected by the mutex */
119   FsCandidate **candidates;
120 
121   /* Whether we create the local candidate ourselves or rely on the remote end
122    * to pass them to us as part of the candidate */
123   gboolean create_local_candidates;
124 
125   /* temporary socket directy in case we made one */
126   gchar *socket_dir;
127 
128   ShmSrc **shm_src;
129   ShmSink **shm_sink;
130 };
131 
132 #define FS_SHM_STREAM_TRANSMITTER_GET_PRIVATE(o)  \
133   (G_TYPE_INSTANCE_GET_PRIVATE ((o), FS_TYPE_SHM_STREAM_TRANSMITTER, \
134                                 FsShmStreamTransmitterPrivate))
135 
136 #define FS_SHM_STREAM_TRANSMITTER_LOCK(s) \
137   g_mutex_lock (&(s)->priv->mutex)
138 #define FS_SHM_STREAM_TRANSMITTER_UNLOCK(s) \
139   g_mutex_unlock (&(s)->priv->mutex)
140 
141 static void fs_shm_stream_transmitter_class_init (FsShmStreamTransmitterClass *klass);
142 static void fs_shm_stream_transmitter_init (FsShmStreamTransmitter *self);
143 static void fs_shm_stream_transmitter_dispose (GObject *object);
144 static void fs_shm_stream_transmitter_finalize (GObject *object);
145 
146 static void fs_shm_stream_transmitter_get_property (GObject *object,
147                                                 guint prop_id,
148                                                 GValue *value,
149                                                 GParamSpec *pspec);
150 static void fs_shm_stream_transmitter_set_property (GObject *object,
151                                                 guint prop_id,
152                                                 const GValue *value,
153                                                 GParamSpec *pspec);
154 
155 static gboolean fs_shm_stream_transmitter_force_remote_candidates (
156     FsStreamTransmitter *streamtransmitter, GList *candidates,
157     GError **error);
158 static gboolean fs_shm_stream_transmitter_gather_local_candidates (
159     FsStreamTransmitter *streamtransmitter,
160     GError **error);
161 
162 static gboolean
163 fs_shm_stream_transmitter_add_sink (FsShmStreamTransmitter *self,
164     FsCandidate *candidate, GError **error);
165 
166 
167 static GObjectClass *parent_class = NULL;
168 // static guint signals[LAST_SIGNAL] = { 0 };
169 
170 static GType type = 0;
171 
172 GType
fs_shm_stream_transmitter_get_type(void)173 fs_shm_stream_transmitter_get_type (void)
174 {
175   return type;
176 }
177 
178 GType
fs_shm_stream_transmitter_register_type(FsPlugin * module G_GNUC_UNUSED)179 fs_shm_stream_transmitter_register_type (FsPlugin *module G_GNUC_UNUSED)
180 {
181   static const GTypeInfo info = {
182     sizeof (FsShmStreamTransmitterClass),
183     NULL,
184     NULL,
185     (GClassInitFunc) fs_shm_stream_transmitter_class_init,
186     NULL,
187     NULL,
188     sizeof (FsShmStreamTransmitter),
189     0,
190     (GInstanceInitFunc) fs_shm_stream_transmitter_init
191   };
192 
193   type = g_type_register_static (FS_TYPE_STREAM_TRANSMITTER,
194       "FsShmStreamTransmitter", &info, 0);
195 
196   return type;
197 }
198 
199 static void
fs_shm_stream_transmitter_class_init(FsShmStreamTransmitterClass * klass)200 fs_shm_stream_transmitter_class_init (FsShmStreamTransmitterClass *klass)
201 {
202   GObjectClass *gobject_class = (GObjectClass *) klass;
203   FsStreamTransmitterClass *streamtransmitterclass =
204     FS_STREAM_TRANSMITTER_CLASS (klass);
205   GParamSpec *pspec;
206 
207   parent_class = g_type_class_peek_parent (klass);
208 
209   gobject_class->set_property = fs_shm_stream_transmitter_set_property;
210   gobject_class->get_property = fs_shm_stream_transmitter_get_property;
211 
212   streamtransmitterclass->force_remote_candidates =
213     fs_shm_stream_transmitter_force_remote_candidates;
214   streamtransmitterclass->gather_local_candidates =
215     fs_shm_stream_transmitter_gather_local_candidates;
216 
217   g_object_class_override_property (gobject_class, PROP_SENDING, "sending");
218   g_object_class_override_property (gobject_class,
219       PROP_PREFERRED_LOCAL_CANDIDATES, "preferred-local-candidates");
220 
221   pspec = g_param_spec_boolean ("create-local-candidates",
222     "CreateLocalCandidates",
223     "Whether the transmitter should automatically create local candidates",
224     FALSE,
225     G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS);
226   g_object_class_install_property (gobject_class,
227     PROP_CREATE_LOCAL_CANDIDATES,
228     pspec);
229 
230 
231   gobject_class->dispose = fs_shm_stream_transmitter_dispose;
232   gobject_class->finalize = fs_shm_stream_transmitter_finalize;
233 
234   g_type_class_add_private (klass, sizeof (FsShmStreamTransmitterPrivate));
235 }
236 
237 static void
fs_shm_stream_transmitter_init(FsShmStreamTransmitter * self)238 fs_shm_stream_transmitter_init (FsShmStreamTransmitter *self)
239 {
240   /* member init */
241   self->priv = FS_SHM_STREAM_TRANSMITTER_GET_PRIVATE (self);
242 
243   self->priv->sending = TRUE;
244 
245   g_mutex_init (&self->priv->mutex);
246 }
247 
248 static void
fs_shm_stream_transmitter_dispose(GObject * object)249 fs_shm_stream_transmitter_dispose (GObject *object)
250 {
251   FsShmStreamTransmitter *self = FS_SHM_STREAM_TRANSMITTER (object);
252   gint c; /* component_id */
253 
254   for (c = 1; c <= self->priv->transmitter->components; c++)
255   {
256     if (self->priv->shm_src[c])
257     {
258       fs_shm_transmitter_check_shm_src (self->priv->transmitter,
259           self->priv->shm_src[c], NULL);
260     }
261     self->priv->shm_src[c] = NULL;
262 
263     if (self->priv->shm_sink[c])
264     {
265       fs_shm_transmitter_check_shm_sink (self->priv->transmitter,
266           self->priv->shm_sink[c], NULL);
267     }
268     self->priv->shm_sink[c] = NULL;
269   }
270 
271   if (self->priv->socket_dir != NULL)
272     g_rmdir (self->priv->socket_dir);
273   g_free (self->priv->socket_dir);
274   self->priv->socket_dir = NULL;
275 
276   parent_class->dispose (object);
277 }
278 
279 static void
fs_shm_stream_transmitter_finalize(GObject * object)280 fs_shm_stream_transmitter_finalize (GObject *object)
281 {
282   FsShmStreamTransmitter *self = FS_SHM_STREAM_TRANSMITTER (object);
283 
284   fs_candidate_list_destroy (self->priv->preferred_local_candidates);
285 
286   g_free (self->priv->shm_src);
287   g_free (self->priv->shm_sink);
288   g_mutex_clear (&self->priv->mutex);
289 
290   parent_class->finalize (object);
291 }
292 
293 static void
fs_shm_stream_transmitter_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)294 fs_shm_stream_transmitter_get_property (GObject *object,
295                                            guint prop_id,
296                                            GValue *value,
297                                            GParamSpec *pspec)
298 {
299   FsShmStreamTransmitter *self = FS_SHM_STREAM_TRANSMITTER (object);
300 
301   switch (prop_id)
302   {
303     case PROP_SENDING:
304       FS_SHM_STREAM_TRANSMITTER_LOCK (self);
305       g_value_set_boolean (value, self->priv->sending);
306       FS_SHM_STREAM_TRANSMITTER_UNLOCK (self);
307       break;
308     case PROP_PREFERRED_LOCAL_CANDIDATES:
309       g_value_set_boxed (value, self->priv->preferred_local_candidates);
310       break;
311     case PROP_CREATE_LOCAL_CANDIDATES:
312       g_value_set_boolean (value, self->priv->create_local_candidates);
313       break;
314     default:
315       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
316       break;
317   }
318 }
319 
320 static void
fs_shm_stream_transmitter_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)321 fs_shm_stream_transmitter_set_property (GObject *object,
322                                            guint prop_id,
323                                            const GValue *value,
324                                            GParamSpec *pspec)
325 {
326   FsShmStreamTransmitter *self = FS_SHM_STREAM_TRANSMITTER (object);
327 
328   switch (prop_id) {
329     case PROP_SENDING:
330       FS_SHM_STREAM_TRANSMITTER_LOCK (self);
331       self->priv->sending = g_value_get_boolean (value);
332       if (self->priv->shm_sink[1])
333         fs_shm_transmitter_sink_set_sending (self->priv->transmitter,
334             self->priv->shm_sink[1], self->priv->sending);
335       FS_SHM_STREAM_TRANSMITTER_UNLOCK (self);
336       break;
337     case PROP_PREFERRED_LOCAL_CANDIDATES:
338       self->priv->preferred_local_candidates = g_value_dup_boxed (value);
339       break;
340     case PROP_CREATE_LOCAL_CANDIDATES:
341       self->priv->create_local_candidates = g_value_get_boolean (value);
342       break;
343     default:
344       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
345       break;
346   }
347 }
348 
349 static gboolean
fs_shm_stream_transmitter_build(FsShmStreamTransmitter * self,GError ** error)350 fs_shm_stream_transmitter_build (FsShmStreamTransmitter *self,
351   GError **error)
352 {
353   self->priv->shm_src = g_new0 (ShmSrc *,
354       self->priv->transmitter->components + 1);
355   self->priv->shm_sink = g_new0 (ShmSink *,
356       self->priv->transmitter->components + 1);
357 
358   return TRUE;
359 }
360 
361 static void
got_buffer_func(GstBuffer * buffer,guint component,gpointer data)362 got_buffer_func (GstBuffer *buffer, guint component, gpointer data)
363 {
364   FsShmStreamTransmitter *self = FS_SHM_STREAM_TRANSMITTER_CAST (data);
365 
366   g_signal_emit_by_name (self, "known-source-packet-received", component,
367       buffer);
368 }
369 
ready_cb(guint component,gchar * path,gpointer data)370 static void ready_cb (guint component, gchar *path, gpointer data)
371 {
372   FsShmStreamTransmitter *self = FS_SHM_STREAM_TRANSMITTER_CAST (data);
373   FsCandidate *candidate = fs_candidate_new (NULL, component,
374       FS_CANDIDATE_TYPE_HOST, FS_NETWORK_PROTOCOL_UDP, path, 0);
375 
376   GST_DEBUG ("Emitting new local candidate with path %s", path);
377 
378   g_signal_emit_by_name (self, "new-local-candidate", candidate);
379   g_signal_emit_by_name (self, "local-candidates-prepared");
380 
381   fs_candidate_destroy (candidate);
382 }
383 
384 static void
connected_cb(guint component,gint id,gpointer data)385 connected_cb (guint component, gint id, gpointer data)
386 {
387   FsShmStreamTransmitter *self = data;
388 
389   g_signal_emit_by_name (self, "state-changed", component,
390       FS_STREAM_STATE_READY);
391 }
392 
393 static gboolean
fs_shm_stream_transmitter_add_sink(FsShmStreamTransmitter * self,FsCandidate * candidate,GError ** error)394 fs_shm_stream_transmitter_add_sink (FsShmStreamTransmitter *self,
395     FsCandidate *candidate, GError **error)
396 {
397   if (self->priv->create_local_candidates)
398     return TRUE;
399 
400   if (!candidate->ip || !candidate->ip[0])
401     return TRUE;
402 
403   if (self->priv->shm_sink[candidate->component_id])
404   {
405     if (fs_shm_transmitter_check_shm_sink (self->priv->transmitter,
406             self->priv->shm_sink[candidate->component_id], candidate->ip))
407       return TRUE;
408     self->priv->shm_sink[candidate->component_id] = NULL;
409   }
410 
411   self->priv->shm_sink[candidate->component_id] =
412     fs_shm_transmitter_get_shm_sink (self->priv->transmitter,
413         candidate->component_id, candidate->ip, ready_cb, connected_cb,
414         self, error);
415 
416   if (self->priv->shm_sink[candidate->component_id] == NULL)
417     return FALSE;
418 
419   if (candidate->component_id == 1)
420     fs_shm_transmitter_sink_set_sending (self->priv->transmitter,
421         self->priv->shm_sink[candidate->component_id], self->priv->sending);
422 
423   return TRUE;
424 }
425 
426 
427 static void
disconnected_cb(guint component,gint id,gpointer data)428 disconnected_cb (guint component, gint id, gpointer data)
429 {
430   FsShmStreamTransmitter *self = data;
431 
432   g_signal_emit_by_name (self, "state-changed", component,
433       FS_STREAM_STATE_FAILED);
434 }
435 
436 static gboolean
fs_shm_stream_transmitter_force_remote_candidate(FsShmStreamTransmitter * self,FsCandidate * candidate,GError ** error)437 fs_shm_stream_transmitter_force_remote_candidate (
438     FsShmStreamTransmitter *self, FsCandidate *candidate,
439     GError **error)
440 {
441   const gchar *path;
442   if (!fs_shm_stream_transmitter_add_sink (self, candidate, error))
443     return FALSE;
444 
445   if (self->priv->create_local_candidates)
446     path = candidate->ip;
447   else
448     path = candidate->username;
449 
450   if (path && path[0])
451   {
452     if (self->priv->shm_src[candidate->component_id])
453     {
454       if (fs_shm_transmitter_check_shm_src (self->priv->transmitter,
455               self->priv->shm_src[candidate->component_id], path))
456         return TRUE;
457       self->priv->shm_src[candidate->component_id] = NULL;
458     }
459 
460     self->priv->shm_src[candidate->component_id] =
461       fs_shm_transmitter_get_shm_src (self->priv->transmitter,
462           candidate->component_id, path, got_buffer_func, disconnected_cb,
463           self, error);
464 
465     if (self->priv->shm_src[candidate->component_id] == NULL)
466       return FALSE;
467   }
468 
469   return TRUE;
470 }
471 
472 /**
473  * fs_shm_stream_transmitter_force_remote_candidates
474  */
475 
476 static gboolean
fs_shm_stream_transmitter_force_remote_candidates(FsStreamTransmitter * streamtransmitter,GList * candidates,GError ** error)477 fs_shm_stream_transmitter_force_remote_candidates (
478     FsStreamTransmitter *streamtransmitter, GList *candidates,
479     GError **error)
480 {
481   GList *item = NULL;
482   FsShmStreamTransmitter *self =
483     FS_SHM_STREAM_TRANSMITTER (streamtransmitter);
484 
485   for (item = candidates; item; item = g_list_next (item))
486   {
487     FsCandidate *candidate = item->data;
488 
489     if (candidate->component_id == 0 ||
490         candidate->component_id > self->priv->transmitter->components) {
491       g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
492           "The candidate passed has an invalid component id %u (not in [1,%u])",
493           candidate->component_id, self->priv->transmitter->components);
494       return FALSE;
495     }
496 
497     if ((!candidate->ip || !candidate->ip[0]) &&
498         (!candidate->username || !candidate->username[0]))
499     {
500       g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
501           "The candidate does not have a SINK shm segment in its ip"
502           " or a SRC shm segment in its username");
503       return FALSE;
504     }
505   }
506 
507   for (item = candidates; item; item = g_list_next (item))
508     if (!fs_shm_stream_transmitter_force_remote_candidate (self,
509             item->data, error))
510       return FALSE;
511 
512 
513   return TRUE;
514 }
515 
516 
517 FsShmStreamTransmitter *
fs_shm_stream_transmitter_newv(FsShmTransmitter * transmitter,guint n_parameters,GParameter * parameters,GError ** error)518 fs_shm_stream_transmitter_newv (FsShmTransmitter *transmitter,
519   guint n_parameters, GParameter *parameters, GError **error)
520 {
521   FsShmStreamTransmitter *streamtransmitter = NULL;
522 
523   streamtransmitter = g_object_newv (FS_TYPE_SHM_STREAM_TRANSMITTER,
524     n_parameters, parameters);
525 
526   if (!streamtransmitter) {
527     g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
528       "Could not build the stream transmitter");
529     return NULL;
530   }
531 
532   streamtransmitter->priv->transmitter = transmitter;
533 
534   if (!fs_shm_stream_transmitter_build (streamtransmitter, error)) {
535     g_object_unref (streamtransmitter);
536     return NULL;
537   }
538 
539   return streamtransmitter;
540 }
541 
542 
543 static gboolean
fs_shm_stream_transmitter_gather_local_candidates(FsStreamTransmitter * streamtransmitter,GError ** error)544 fs_shm_stream_transmitter_gather_local_candidates (
545     FsStreamTransmitter *streamtransmitter,
546     GError **error)
547 {
548   FsShmStreamTransmitter *self =
549     FS_SHM_STREAM_TRANSMITTER (streamtransmitter);
550   GList *item;
551 
552   if (self->priv->create_local_candidates)
553   {
554     guint c;
555     gchar *socket_dir;
556 
557     socket_dir = g_build_filename (g_get_tmp_dir (),
558       "farstream-shm-XXXXXX", NULL);
559 
560     if (g_mkdtemp (socket_dir) == NULL)
561       return FALSE;
562 
563     self->priv->socket_dir = socket_dir;
564 
565     for (c = 1; c <= self->priv->transmitter->components; c++)
566     {
567       gchar *path = g_strdup_printf ("%s/shm-sink-socket-%d", socket_dir, c);
568 
569       self->priv->shm_sink[c] =
570         fs_shm_transmitter_get_shm_sink (self->priv->transmitter,
571           c, path, ready_cb, connected_cb, self, error);
572       g_free (path);
573 
574       if (self->priv->shm_sink[c] == NULL)
575         return FALSE;
576 
577       if (c == 1)
578         fs_shm_transmitter_sink_set_sending (self->priv->transmitter,
579             self->priv->shm_sink[c], self->priv->sending);
580     }
581 
582     return TRUE;
583   }
584 
585   for (item = self->priv->preferred_local_candidates;
586        item;
587        item = g_list_next (item))
588   {
589     FsCandidate *candidate = item->data;
590 
591     if (candidate->ip && candidate->ip[0])
592       if (!fs_shm_stream_transmitter_add_sink (self, candidate, error))
593         return FALSE;
594   }
595 
596   return TRUE;
597 }
598