1 /*
2  * Copyright (C) 2010 Canonical, Ltd.
3  *
4  * This library is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU Lesser General Public License
6  * version 3.0 as published by the Free Software Foundation.
7  *
8  * This library is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11  * GNU Lesser General Public License version 3.0 for more details.
12  *
13  * You should have received a copy of the GNU Lesser General Public
14  * License along with this library. If not, see
15  * <http://www.gnu.org/licenses/>.
16  *
17  * Authors:
18  *      Neil Jagdish Patel <neil.patel@canonical.com>
19  *      Mikkel Kamstrup Erlandsen <mikkel.kamstrup@canonical.com>
20  *
21  */
22 /**
23  * SECTION:dee-peer
24  * @short_description: Finds other objects with the same swarm-name on the bus.
25  * @include: dee.h
26  *
27  * #DeePeer allows you to build objects that can rendevouz on DBus
28  * without the need for an central registration service. Think of it like
29  * peer-to-peer for your application. The DBus session bus will also implicitly
30  * elect a swarm leader - namely the one owning the swarm name on the bus, but
31  * it's up to the consumer of this API to determine whether swarm leadership has
32  * any concrete responsibilities associated.
33  *
34  * Peers find eachother through a well-known "swarm-name", which is a
35  * well known DBus name, such as: org.myapp.MyPeers. Choose a namespaced
36  * name that would not normally be used outside of your program.
37  *
38  * For example:
39  * <informalexample><programlisting>
40  * {
41  *   DeePeer *peer;
42  *
43  *   peer = g_object_new (DBUS_TYPE_PEER,
44  *                        "swarm-name", "org.myapp.MyPeers",
45  *                        NULL);
46  *
47  *   g_signal_connect (peer, "peer-found",
48  *                     G_CALLBACK (on_peer_found), NULL);
49  *   g_signal_connect (peer, "peer-lost",
50  *                     G_CALLBACK (on_peer_lost), NULL);
51  * }
52  * </programlisting></informalexample>
53  */
54 
55 #ifdef HAVE_CONFIG_H
56 #include <config.h>
57 #endif
58 
59 #include <gio/gio.h>
60 
61 #include "dee-peer.h"
62 #include "dee-marshal.h"
63 #include "trace-log.h"
64 
65 #define _DeePeerIter GSequenceIter
66 
67 /**
68  * DeePeerPrivate:
69  *
70  * Ignore this structure.
71  **/
72 struct _DeePeerPrivate
73 {
74   GDBusConnection *connection;
75 
76   /* Used as a hash set with the unique addresses of the peers.
77    * This hash table must be protected from concurrent access,
78    * since it's used in the GDBus message dispatch thread */
79   GHashTable *peers;
80 
81   /* A List with the string-formatted match rules we've installed */
82   GSList *match_rules;
83 
84   /* The GDBus filter id, so we can uninstall our message filter again */
85   guint   filter_id;
86 
87   /* GDBus id for the DBus signal subscriptions */
88   guint   dbus_signals_id;
89 
90   /* The GDBus name owner id for g_bus_own_name() */
91   guint   name_owner_id;
92 
93   /* The GDBus name watcher id from g_bus_watch_name() */
94   guint name_watcher_id;
95 
96   /* Swarm related properties */
97   gboolean     swarm_owner;
98   const gchar *own_name;
99   gchar       *swarm_name;
100   gchar       *swarm_path;
101   gchar       *swarm_leader;
102 
103   gboolean connected;
104   gboolean is_swarm_leader;
105   gboolean has_been_leader;
106   gboolean is_first_name_check;
107 
108   GCancellable *list_cancellable;
109 
110   /* if priv->head_count != NULL it indicates that we are in
111    * "head counting mode" in which case priv->head_count_source will be a
112    * GSource id for a timeout that completes the head count */
113   GSList *head_count;
114   guint   head_count_source;
115 
116   /* Protecting the priv->peers table from concurrent access in
117    * the GDBus message dispatch thread */
118 #if GLIB_CHECK_VERSION(2, 31, 16)
119   GMutex lock_real;
120 #endif
121   GMutex *lock;
122 };
123 
124 /* Globals */
125 enum
126 {
127   PROP_0,
128   PROP_SWARM_NAME,
129   PROP_SWARM_LEADER,
130   PROP_SWARM_OWNER
131 };
132 
133 enum
134 {
135   PEER_FOUND,
136   PEER_LOST,
137   CONNECTION_ACQUIRED,
138   CONNECTION_CLOSED,
139 
140   LAST_SIGNAL
141 };
142 
143 G_DEFINE_TYPE_WITH_PRIVATE (DeePeer, dee_peer, G_TYPE_OBJECT)
144 
145 static guint32 _peer_signals[LAST_SIGNAL] = { 0 };
146 
147 /* Forwards */
148 static void                 remove_match_rule        (GDBusConnection *conn,
149                                                       const gchar     *rule);
150 
151 static void                 emit_peer_found          (DeePeer    *self,
152                                                       const gchar *name);
153 
154 static void                 on_bus_acquired          (GDBusConnection *connection,
155                                                       const gchar     *name,
156                                                       gpointer         user_data);
157 
158 static void                 on_leadership_lost       (GDBusConnection *connection,
159                                                       const gchar     *name,
160                                                       gpointer         user_data);
161 
162 static void                 on_leadership_acquired   (GDBusConnection *connection,
163                                                       const gchar     *name,
164                                                       gpointer         user_data);
165 
166 static void                 on_leadership_changed    (GDBusConnection *connection,
167                                                       const gchar     *name,
168                                                       const gchar     *name_owner,
169                                                       gpointer         user_data);
170 
171 static void                 on_join_received         (DeePeer    *self,
172                                                       const gchar *peer_address);
173 
174 static void                 on_bye_received          (DeePeer    *self,
175                                                       const gchar *peer_address);
176 
177 static void                 on_ping_received         (DeePeer    *self,
178                                                       const gchar *leader_address);
179 
180 static void                 on_pong_received         (DeePeer    *self,
181                                                       const gchar *peer_address);
182 
183 static void                 on_list_received         (GObject      *source_object,
184                                                       GAsyncResult *res,
185                                                       gpointer      user_data);
186 
187 static void                 set_swarm_name           (DeePeer    *self,
188                                                       const gchar *swarm_name);
189 
190 static void                 emit_ping                (DeePeer    *self);
191 
192 static void                 emit_pong                (DeePeer    *self);
193 
194 static void                 on_dbus_peer_signal      (GDBusConnection *connection,
195                                                       const gchar     *sender_name,
196                                                       const gchar     *object_path,
197                                                       const gchar     *interface_name,
198                                                       const gchar     *signal_name,
199                                                       GVariant        *parameters,
200                                                       gpointer         user_data);
201 
202 static GDBusMessage*        gdbus_message_filter    (GDBusConnection *connection,
203                                                      GDBusMessage    *message,
204                                                      gboolean         incoming,
205                                                      gpointer         user_data);
206 
207 static const gchar*  dee_peer_real_get_swarm_leader (DeePeer *self);
208 
209 static gboolean      dee_peer_real_is_swarm_leader  (DeePeer *self);
210 
211 static GSList*       dee_peer_real_get_connections  (DeePeer *self);
212 
213 static gchar**       dee_peer_real_list_peers       (DeePeer *self);
214 
215 /* GObject methods */
216 static void
dee_peer_dispose(GObject * object)217 dee_peer_dispose (GObject *object)
218 {
219   DeePeerPrivate *priv;
220   GSList *match_iter;
221 
222   priv = DEE_PEER (object)->priv;
223 
224   /* Remove match rules from the bus, and free the string repr. of the rule  */
225   if (priv->connection)
226     {
227       /* Uninstall filter.
228        * Implementation note: We must remove the filter and signal listener
229        * _before_ dropping the swarm name because gdbus currently does a
230        * sync dbus call to release the name which makes us race against
231        * getting a NameOwnerChanged */
232       /* The removal of the filter rules also has to be done in dispose,
233        * not finalize, cause the filter callback can ref this object and
234        * therefore postpone finalization, although that shouldn't happen,
235        * as this uses locking and the call will not return until all current
236        * invocations of the filter callback finish. */
237       g_dbus_connection_remove_filter (priv->connection, priv->filter_id);
238 
239       for (match_iter = priv->match_rules;
240            match_iter != NULL;
241            match_iter = match_iter->next)
242         {
243           remove_match_rule (priv->connection, match_iter->data);
244           g_free (match_iter->data);
245         }
246 
247       /* Stop listening for signals */
248       if (priv->dbus_signals_id != 0)
249         {
250           g_dbus_connection_signal_unsubscribe (priv->connection,
251                                                 priv->dbus_signals_id);
252           priv->dbus_signals_id = 0;
253         }
254 
255       g_object_unref (priv->connection);
256       priv->connection = NULL;
257     }
258 
259   if (priv->match_rules)
260     {
261       g_slist_free (priv->match_rules);
262       priv->match_rules = NULL;
263     }
264 
265   /* Stop trying to own the swarm name.
266    * See implementation note above */
267   if (priv->name_owner_id != 0)
268     {
269       g_bus_unown_name (priv->name_owner_id);
270       priv->name_owner_id = 0;
271     }
272 
273   /* Stop listening for swarm leadership changes */
274   if (priv->name_watcher_id != 0)
275     {
276       g_bus_unwatch_name (priv->name_watcher_id);
277       priv->name_watcher_id = 0;
278     }
279 
280   G_OBJECT_CLASS (dee_peer_parent_class)->dispose (object);
281 }
282 
283 static void
dee_peer_finalize(GObject * object)284 dee_peer_finalize (GObject *object)
285 {
286   DeePeerPrivate *priv;
287 
288   priv = DEE_PEER (object)->priv;
289 
290   if (priv->list_cancellable != NULL)
291     {
292       g_cancellable_cancel (priv->list_cancellable);
293       g_object_unref (priv->list_cancellable);
294       priv->list_cancellable = NULL;
295     }
296 
297   /* Free resources */
298   if (priv->swarm_name)
299     {
300       g_free (priv->swarm_name);
301       priv->swarm_name = NULL;
302     }
303   if (priv->swarm_path)
304     {
305       g_free (priv->swarm_path);
306       priv->swarm_path = NULL;
307     }
308   if (priv->swarm_leader)
309     {
310       g_free (priv->swarm_leader);
311       priv->swarm_leader = NULL;
312     }
313   if (priv->peers)
314     {
315       g_hash_table_destroy (priv->peers);
316       priv->peers = NULL;
317     }
318   if (priv->lock != NULL)
319     {
320 #if GLIB_CHECK_VERSION(2, 31, 16)
321       g_mutex_clear (priv->lock);
322 #else
323       g_mutex_free (priv->lock);
324 #endif
325       priv->lock = NULL;
326     }
327   if (priv->head_count != NULL)
328     {
329       g_slist_foreach(priv->head_count, (GFunc) g_free, NULL);
330       g_slist_free (priv->head_count);
331       priv->head_count = NULL;
332     }
333   if (priv->head_count_source != 0)
334     {
335       g_source_remove (priv->head_count_source);
336       priv->head_count_source = 0;
337     }
338 
339   G_OBJECT_CLASS (dee_peer_parent_class)->finalize (object);
340 }
341 
342 static void
dee_peer_constructed(GObject * self)343 dee_peer_constructed (GObject *self)
344 {
345   DeePeerPrivate    *priv;
346   GBusNameOwnerFlags flags;
347 
348   priv = DEE_PEER (self)->priv;
349 
350   if (priv->swarm_name == NULL)
351     {
352       g_critical ("DeePeer created without a swarm name. You must specify "
353                   "a non-NULL swarm name");
354       return;
355     }
356 
357   /* Contend to be swarm leaders. Pick me! Pick me! */
358   flags = priv->swarm_owner ?
359     G_BUS_NAME_OWNER_FLAGS_REPLACE : G_BUS_NAME_OWNER_FLAGS_ALLOW_REPLACEMENT;
360 
361   priv->name_owner_id = g_bus_own_name (G_BUS_TYPE_SESSION,
362                                         priv->swarm_name,      /* name to own */
363                                         flags,
364                                         on_bus_acquired,
365                                         on_leadership_acquired,
366                                         on_leadership_lost,
367                                         self,                    /* user data */
368                                         NULL);                   /* free func */
369 
370   /* Listen for changes in leadership */
371   priv->name_watcher_id = g_bus_watch_name(G_BUS_TYPE_SESSION,
372                                            priv->swarm_name,
373                                            G_BUS_NAME_WATCHER_FLAGS_NONE,
374                                            on_leadership_changed,
375                                            NULL, /* name vanished cb */
376                                            self, /* user data */
377                                            NULL); /* user data free func */
378 }
379 
380 
381 static void
dee_peer_set_property(GObject * object,guint id,const GValue * value,GParamSpec * pspec)382 dee_peer_set_property (GObject       *object,
383                        guint          id,
384                        const GValue  *value,
385                        GParamSpec    *pspec)
386 {
387   DeePeerPrivate *priv = DEE_PEER (object)->priv;
388 
389 
390   switch (id)
391     {
392     case PROP_SWARM_NAME:
393       set_swarm_name (DEE_PEER (object), g_value_get_string (value));
394       break;
395     case PROP_SWARM_LEADER:
396       g_free (priv->swarm_leader);
397       priv->swarm_leader = g_value_dup_string (value);
398       break;
399     case PROP_SWARM_OWNER:
400       priv->swarm_owner = g_value_get_boolean (value);
401       break;
402     default:
403       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, id, pspec);
404       break;
405     }
406 }
407 
408 static void
dee_peer_get_property(GObject * object,guint id,GValue * value,GParamSpec * pspec)409 dee_peer_get_property (GObject     *object,
410                         guint        id,
411                         GValue      *value,
412                         GParamSpec  *pspec)
413 {
414   switch (id)
415     {
416     case PROP_SWARM_NAME:
417       g_value_set_string (value, DEE_PEER (object)->priv->swarm_name);
418       break;
419     case PROP_SWARM_LEADER:
420       g_value_set_string (value, dee_peer_get_swarm_leader (DEE_PEER (object)));
421       break;
422     case PROP_SWARM_OWNER:
423       g_value_set_boolean (value, DEE_PEER (object)->priv->swarm_owner);
424       break;
425     default:
426       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, id, pspec);
427       break;
428     }
429 }
430 
431 static void
dee_peer_class_init(DeePeerClass * klass)432 dee_peer_class_init (DeePeerClass *klass)
433 {
434   GObjectClass *obj_class = G_OBJECT_CLASS (klass);
435   GParamSpec   *pspec;
436 
437   obj_class->dispose      = dee_peer_dispose;
438   obj_class->finalize     = dee_peer_finalize;
439   obj_class->set_property = dee_peer_set_property;
440   obj_class->get_property = dee_peer_get_property;
441   obj_class->constructed  = dee_peer_constructed;
442 
443   /* Virtual methods */
444   klass->get_swarm_leader = dee_peer_real_get_swarm_leader;
445   klass->is_swarm_leader  = dee_peer_real_is_swarm_leader;
446   klass->get_connections  = dee_peer_real_get_connections;
447   klass->list_peers       = dee_peer_real_list_peers;
448 
449   /* Add Signals */
450 
451   /**
452    * DeePeer::peer-found:
453    * @self: the #DeePeer on which the signal is emitted
454    * @name: the DBus name of the object found
455    *
456    * Connect to this signal to be notified of existing and new peers that are
457    *   in your swarm.
458    **/
459   _peer_signals[PEER_FOUND] =
460     g_signal_new ("peer-found",
461                   G_TYPE_FROM_CLASS (klass),
462                   G_SIGNAL_RUN_LAST,
463                   G_STRUCT_OFFSET (DeePeerClass, peer_found),
464                   NULL, NULL,
465                   g_cclosure_marshal_VOID__STRING,
466                   G_TYPE_NONE, 1,
467                   G_TYPE_STRING);
468 
469   /**
470    * DeePeer::peer-lost:
471    * @self: the #DeePeer on which the signal is emitted
472    * @name: the DBus name of the object that disconnected
473    *
474    * Connect to this signal to be notified when peers disconnect from the swarm
475    **/
476   _peer_signals[PEER_LOST] =
477     g_signal_new ("peer-lost",
478                   G_TYPE_FROM_CLASS (klass),
479                   G_SIGNAL_RUN_LAST,
480                   G_STRUCT_OFFSET (DeePeerClass, peer_lost),
481                   NULL, NULL,
482                   g_cclosure_marshal_VOID__STRING,
483                   G_TYPE_NONE, 1,
484                   G_TYPE_STRING);
485 
486   /**
487    * DeePeer::new-connection:
488    * @self: the #DeePeer on which the signal is emitted
489    * @connection: the new #GDBusConnection
490    *
491    * Connect to this signal to be notified when peers connect via
492    * new #GDBusConnection.
493    **/
494   _peer_signals[CONNECTION_ACQUIRED] =
495     g_signal_new ("connection-acquired",
496                   G_TYPE_FROM_CLASS (klass),
497                   G_SIGNAL_RUN_LAST,
498                   G_STRUCT_OFFSET (DeePeerClass, connection_acquired),
499                   NULL, NULL,
500                   g_cclosure_marshal_VOID__OBJECT,
501                   G_TYPE_NONE, 1,
502                   G_TYPE_DBUS_CONNECTION);
503 
504   /**
505    * DeePeer::connection-closed:
506    * @self: the #DeePeer on which the signal is emitted
507    * @connection: the closed #GDBusConnection
508    *
509    * Connect to this signal to be notified when peers close
510    * their #GDBusConnection.
511    **/
512   _peer_signals[CONNECTION_CLOSED] =
513     g_signal_new ("connection-closed",
514                   G_TYPE_FROM_CLASS (klass),
515                   G_SIGNAL_RUN_LAST,
516                   G_STRUCT_OFFSET (DeePeerClass, connection_closed),
517                   NULL, NULL,
518                   g_cclosure_marshal_VOID__OBJECT,
519                   G_TYPE_NONE, 1,
520                   G_TYPE_DBUS_CONNECTION);
521 
522   /* Add properties */
523   /**
524    * DeePeer::swarm-name:
525    *
526    * The name of the swarm that this peer is connected to. All swarm members
527    * will try and own this name on the session bus. The one owning the name
528    * is the swarm leader.
529    */
530   pspec = g_param_spec_string ("swarm-name", "Swarm Name",
531                                "Well-known name to find other peers with",
532                                NULL,
533                                G_PARAM_READWRITE | G_PARAM_CONSTRUCT
534                                | G_PARAM_STATIC_STRINGS);
535   g_object_class_install_property (obj_class, PROP_SWARM_NAME, pspec);
536 
537   /**
538    * DeePeer::swarm-leader:
539    *
540    * The name of the swarm that this peer is connected to. All swarm members
541    * will try and own this name on the session bus. The one owning the name
542    * is the swarm leader.
543    **/
544   pspec = g_param_spec_string ("swarm-leader", "Swarm Leader",
545                                "Unique DBus address of the swarm leader",
546                                NULL,
547                                G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
548   g_object_class_install_property (obj_class, PROP_SWARM_LEADER, pspec);
549 
550   /**
551    * DeePeer::swarm-owner:
552    *
553    * If set, this peer will try to become a leader of the swarm.
554    *
555    * Creating a #DeeSharedModel with a peer that successfully assumes ownership
556    * of a swarm will skip cloning of the model, therefore you need to set
557    * the schema and fill the model with data yourself.
558    *
559    * Setting this property to TRUE does NOT guarantee that this peer will
560    * become a leader. You should always check the :swarm-leader property.
561    **/
562   pspec = g_param_spec_boolean ("swarm-owner", "Swarm Owner",
563                                 "Try to assume leadership of the swarm",
564                                 FALSE,
565                                 G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY
566                                 | G_PARAM_STATIC_STRINGS);
567   g_object_class_install_property (obj_class, PROP_SWARM_OWNER, pspec);
568 }
569 
570 static void
dee_peer_init(DeePeer * peer)571 dee_peer_init (DeePeer *peer)
572 {
573   DeePeerPrivate *priv;
574 
575   priv = peer->priv = dee_peer_get_instance_private (peer);
576 
577   priv->swarm_name = NULL;
578   priv->swarm_leader = NULL;
579   priv->own_name = NULL;
580   priv->match_rules = NULL;
581   priv->peers = g_hash_table_new_full (g_str_hash,
582                                        g_str_equal,
583                                        (GDestroyNotify) g_free,
584                                        NULL);
585 
586   priv->connected = FALSE;
587   priv->is_swarm_leader = FALSE;
588   priv->has_been_leader = FALSE;
589   priv->is_first_name_check = TRUE;
590 
591   priv->list_cancellable = NULL;
592 
593 #if GLIB_CHECK_VERSION(2, 31, 16)
594   g_mutex_init (&priv->lock_real);
595   priv->lock = &priv->lock_real;
596 #else
597   priv->lock = g_mutex_new ();
598 #endif
599 
600   priv->head_count_source = 0;
601 }
602 
603 /* Private Methods */
604 
605 
606 /* Async callback for com.canonical.Dee.Peer.List */
607 static void
on_list_received(GObject * source_object,GAsyncResult * res,gpointer user_data)608 on_list_received (GObject      *source_object,
609                   GAsyncResult *res,
610                   gpointer      user_data)
611 {
612   DeePeer        *self;
613   DeePeerPrivate *priv;
614   GHashTable     *peers, *old_peers_ht;
615   GSList         *new_peers, *iter;
616   guint           i;
617   GVariant       *val, *_val;
618   const gchar    **names;
619   gsize           n_names;
620   GError         *error;
621   GHashTableIter  hiter;
622   gpointer        hkey, hval;
623 
624   error = NULL;
625   _val = g_dbus_connection_call_finish (G_DBUS_CONNECTION (source_object),
626                                         res, &error);
627   if (error != NULL)
628     {
629       if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
630         {
631           g_warning ("%s: Unable to list peers: %s", G_STRLOC, error->message);
632         }
633       g_error_free (error);
634       return;
635     }
636 
637   g_return_if_fail (DEE_IS_PEER (user_data));
638   self = DEE_PEER (user_data);
639   priv = self->priv;
640 
641   /* Unpack the wrapping struct from the reply */
642   val = g_variant_get_child_value (_val, 0);
643   g_variant_unref (_val);
644 
645   names = g_variant_get_strv (val, &n_names);
646   trace_object (self, "Got list of %d peers", n_names);
647 
648   /* We diff the current list of peers against the new list
649    * and emit signals accordingly. New peers are added to new_peers,
650    * and lost peers will remain in priv->peers: */
651   new_peers = NULL;
652   peers = g_hash_table_new_full (g_str_hash,
653                                  g_str_equal,
654                                  (GDestroyNotify)g_free,
655                                  NULL);
656 
657   g_mutex_lock (priv->lock);
658   for (i = 0; i < n_names; i++)
659     {
660       g_hash_table_insert (peers, g_strdup (names[i]), NULL);
661       if (!g_hash_table_remove (priv->peers, names[i]))
662         {
663           /* The peer was not previously known */
664           new_peers = g_slist_prepend (new_peers, (gchar *) names[i]);
665         }
666     }
667 
668   /* Signal about lost peers */
669   g_hash_table_iter_init (&hiter, priv->peers);
670   while (g_hash_table_iter_next (&hiter, &hkey, &hval))
671     {
672       g_signal_emit (self, _peer_signals[PEER_LOST], 0, hkey);
673     }
674 
675   old_peers_ht = priv->peers;
676   priv->peers = peers;
677   g_mutex_unlock (priv->lock);
678 
679   /* Signal about new peers */
680   for (iter = new_peers; iter; iter = iter->next)
681     {
682       emit_peer_found (self, (const gchar*)iter->data);
683     }
684 
685   /* The return value of g_variant_get_strv() is a shallow copy */
686   g_free (names);
687   g_variant_unref (val);
688 
689   /* Free just the array, not the strings - they are owned by 'peers' now */
690   g_slist_free (new_peers);
691   g_hash_table_destroy (old_peers_ht);
692 }
693 
694 /* Install a DBus match rule, async, described by a printf-like format string.
695  * The match rule will be properly retracted when @self is finalized */
696 static void
install_match_rule(DeePeer * self,const char * rule,...)697 install_match_rule (DeePeer *self, const char *rule, ...)
698 {
699   DeePeerPrivate *priv;
700   gchar          *f_rule;
701   va_list         args;
702 
703   g_return_if_fail (DEE_IS_PEER (self));
704   g_return_if_fail (rule != NULL);
705 
706   priv = self->priv;
707 
708 	va_start (args, rule);
709   f_rule = g_strdup_vprintf (rule, args);
710   va_end (args);
711 
712   /* By setting the error argument to NULL libdbus will use async mode
713    * for adding the match rule. We want that. */
714   g_dbus_connection_call (priv->connection,
715                           "org.freedesktop.DBus",
716                           "/org/freedesktop/dbus",
717                           "org.freedesktop.DBus",
718                           "AddMatch",
719                           g_variant_new ("(s)", f_rule),
720                           NULL, /* reply type */
721                           G_DBUS_CALL_FLAGS_NONE,
722                           -1,
723                           NULL,  /* cancellable */
724                           NULL,  /* callback */
725                           NULL); /* user_data */
726 
727   priv->match_rules = g_slist_prepend (priv->match_rules, f_rule);
728 }
729 
730 static void
remove_match_rule(GDBusConnection * conn,const gchar * rule)731 remove_match_rule (GDBusConnection *conn, const gchar *rule)
732 {
733   g_dbus_connection_call (conn,
734                           "org.freedesktop.DBus",
735                           "/org/freedesktop/dbus",
736                           "org.freedesktop.DBus",
737                           "RemoveMatch",
738                           g_variant_new ("(s)", rule),
739                           NULL, /* reply type */
740                           G_DBUS_CALL_FLAGS_NONE,
741                           -1,
742                           NULL,  /* cancellable */
743                           NULL,  /* callback */
744                           NULL); /* user_data */
745 }
746 
747 static const gchar*
dee_peer_real_get_swarm_leader(DeePeer * self)748 dee_peer_real_get_swarm_leader (DeePeer *self)
749 {
750   return self->priv->swarm_leader;
751 }
752 
753 static gboolean
dee_peer_real_is_swarm_leader(DeePeer * self)754 dee_peer_real_is_swarm_leader (DeePeer *self)
755 {
756   return self->priv->is_swarm_leader;
757 }
758 
759 static GSList*
dee_peer_real_get_connections(DeePeer * self)760 dee_peer_real_get_connections (DeePeer *self)
761 {
762   GSList *list = NULL;
763 
764   if (self->priv->connection)
765     {
766       list = g_slist_append (list, self->priv->connection);
767     }
768 
769   return list;
770 }
771 
772 static gchar**
dee_peer_real_list_peers(DeePeer * self)773 dee_peer_real_list_peers (DeePeer *self)
774 {
775   DeePeerPrivate *priv;
776   GHashTableIter iter;
777   gpointer key, value;
778   gchar **result;
779   int i;
780 
781   priv = self->priv;
782   i = 0;
783 
784   g_mutex_lock (priv->lock);
785   result = g_new (gchar*, g_hash_table_size (priv->peers) + 1);
786   g_hash_table_iter_init (&iter, priv->peers);
787   while (g_hash_table_iter_next (&iter, &key, &value))
788     {
789       result[i++] = g_strdup ((gchar*) key);
790     }
791   g_mutex_unlock (priv->lock);
792 
793   result[i] = NULL;
794 
795   return result;
796 }
797 
798 /* Public Methods */
799 
800 /**
801  * dee_peer_new:
802  * @swarm_name: The name of the swarm to join.
803  *              Fx &quot;org.example.DataProviders&quot;
804  *
805  * Create a new #DeePeer. The peer will immediately connect to the swarm
806  * and start the peer discovery.
807  *
808  * Return value: (transfer full): A newly constructed #DeePeer.
809  *               Free with g_object_unref().
810  */
811 DeePeer*
dee_peer_new(const gchar * swarm_name)812 dee_peer_new (const gchar* swarm_name)
813 {
814   g_return_val_if_fail (swarm_name != NULL, NULL);
815 
816   return g_object_new (DEE_TYPE_PEER, "swarm-name", swarm_name, NULL);
817 }
818 
819 /**
820  * dee_peer_is_swarm_leader:
821  * @self: a #DeePeer
822  *
823  * Return value: %TRUE if and only if this peer owns the swarm name on
824  *               the session bus
825  */
826 gboolean
dee_peer_is_swarm_leader(DeePeer * self)827 dee_peer_is_swarm_leader (DeePeer *self)
828 {
829   g_return_val_if_fail (DEE_IS_PEER (self), FALSE);
830 
831   DeePeerClass *klass = DEE_PEER_GET_CLASS (self);
832   return klass->is_swarm_leader (self);
833 }
834 
835 /**
836  * dee_peer_get_swarm_leader:
837  * @self: a #DeePeer
838  *
839  * In case this peer is connected to a message bus, gets the unique DBus
840  * address of the current swarm leader, otherwise returns id of the leader.
841  *
842  * Return value: Unique DBus address of the current swarm leader,
843  *    possibly %NULL if the leader has not been detected yet
844  */
845 const gchar*
dee_peer_get_swarm_leader(DeePeer * self)846 dee_peer_get_swarm_leader (DeePeer *self)
847 {
848   g_return_val_if_fail (DEE_IS_PEER (self), NULL);
849 
850   DeePeerClass *klass = DEE_PEER_GET_CLASS (self);
851   return klass->get_swarm_leader (self);
852 }
853 
854 /**
855  * dee_peer_get_swarm_name:
856  * @self: a #DeePeer
857  *
858  * Gets the unique name for this swarm. The swarm leader is the Peer owning
859  * this name on the session bus.
860  *
861  * Return value: The swarm name
862  **/
863 const gchar*
dee_peer_get_swarm_name(DeePeer * self)864 dee_peer_get_swarm_name (DeePeer *self)
865 {
866   g_return_val_if_fail (DEE_IS_PEER (self), NULL);
867 
868   return self->priv->swarm_name;
869 }
870 
871 /**
872  * dee_peer_get_connections:
873  * @self: a #DeePeer
874  *
875  * Gets list of #GDBusConnection instances used by this #DeePeer instance.
876  *
877  * Return value: (transfer container) (element-type Gio.DBusConnection):
878  *               List of connections.
879  */
880 GSList*
dee_peer_get_connections(DeePeer * self)881 dee_peer_get_connections (DeePeer *self)
882 {
883   g_return_val_if_fail (DEE_IS_PEER (self), NULL);
884 
885   DeePeerClass *klass = DEE_PEER_GET_CLASS (self);
886   return klass->get_connections (self);
887 }
888 
889 /**
890  * dee_peer_list_peers:
891  * @self: a #DeePeer
892  *
893  * Gets list of all peers currently in this swarm.
894  *
895  * Return value: (transfer full): List of peers (free using g_strfreev()).
896  */
897 gchar**
dee_peer_list_peers(DeePeer * self)898 dee_peer_list_peers (DeePeer *self)
899 {
900   g_return_val_if_fail (DEE_IS_PEER (self), NULL);
901 
902   DeePeerClass *klass = DEE_PEER_GET_CLASS (self);
903   return klass->list_peers (self);
904 }
905 
906 /**
907  * dee_peer_is_swarm_owner:
908  * @self: a #DeePeer
909  *
910  * Gets the value of the :swarm-owner property.
911  *
912  * Note that this does NOT mean that the peer is leader of the swarm! Check also
913  * dee_peer_is_swarm_leader().
914  *
915  * Return value: TRUE if the :swarm-owner property was set during construction.
916  */
917 gboolean
dee_peer_is_swarm_owner(DeePeer * self)918 dee_peer_is_swarm_owner (DeePeer *self)
919 {
920   g_return_val_if_fail (DEE_IS_PEER (self), FALSE);
921 
922   return self->priv->swarm_owner;
923 }
924 
925 static void
emit_peer_found(DeePeer * self,const gchar * name)926 emit_peer_found (DeePeer     *self,
927                  const gchar *name)
928 {
929   DeePeerPrivate *priv;
930 
931   g_return_if_fail (DEE_IS_PEER(self));
932   g_return_if_fail (name != NULL);
933 
934   priv = self->priv;
935 
936   if (!g_str_equal (name, priv->own_name))
937     {
938       g_signal_emit (self, _peer_signals[PEER_FOUND], 0, name);
939     }
940 }
941 
942 static void
set_swarm_name(DeePeer * self,const gchar * swarm_name)943 set_swarm_name (DeePeer    *self,
944                 const gchar *swarm_name)
945 {
946   DeePeerPrivate *priv;
947   gchar           *dummy;
948 
949   g_return_if_fail (DEE_IS_PEER (self));
950   g_return_if_fail (swarm_name != NULL);
951   priv = self->priv;
952 
953   if (priv->swarm_name)
954     {
955       g_warning ("%s: Unable to set previously set swarm_name (%s) to (%s)",
956                  G_STRLOC,
957                  priv->swarm_name,
958                  swarm_name);
959       return;
960     }
961 
962   /* If swarm_name is org.example.MyService then the swarm_path will
963    * become /com/canonical/dee/org/example/MyService. Note that
964    * the actual object path of the peer is not used in the Swarm spec */
965 
966   priv->swarm_name = g_strdup (swarm_name);
967   dummy = g_strdelimit (g_strdup(swarm_name), ".", '/');
968   priv->swarm_path = g_strdup_printf ("/com/canonical/dee/peer/%s", dummy);
969 
970   g_free (dummy);
971 }
972 
973 static void
dispose_weak_ref(gpointer data)974 dispose_weak_ref (gpointer data)
975 {
976   GWeakRef *weak_ref = (GWeakRef*) data;
977   g_weak_ref_clear (weak_ref);
978   g_free (data);
979 }
980 
981 /* Called when we get the bus connection the first time. */
982 static void
on_bus_acquired(GDBusConnection * connection,const gchar * name,gpointer user_data)983 on_bus_acquired (GDBusConnection *connection,
984                  const gchar     *name,
985                  gpointer         user_data)
986 {
987   DeePeer        *self;
988   DeePeerPrivate *priv;
989   GWeakRef       *weak_ref;
990   GPtrArray      *ptr_array;
991 
992   g_return_if_fail (DEE_IS_PEER (user_data));
993 
994   self = DEE_PEER (user_data);
995   priv = self->priv;
996   priv->connection = g_object_ref (connection);
997   priv->own_name = g_strdup (g_dbus_connection_get_unique_name (connection));
998 
999   g_signal_emit (self, _peer_signals[CONNECTION_ACQUIRED], 0, priv->connection);
1000 
1001   /* Using GPtrArray as a ref-count container for the weak ref */
1002   weak_ref = (GWeakRef*) g_new (GWeakRef, 1);
1003   g_weak_ref_init (weak_ref, self);
1004   ptr_array = g_ptr_array_new_full (1, dispose_weak_ref);
1005   g_ptr_array_add (ptr_array, weak_ref);
1006 
1007   /* FIXME: the last param should be g_ptr_array_unref, but there's a bug
1008    * in gio that can cause a crash, we'll rather have a small leak than
1009    * random crashes.
1010    * https://bugzilla.gnome.org/show_bug.cgi?id=704568 */
1011   priv->filter_id = g_dbus_connection_add_filter (priv->connection,
1012                                                   gdbus_message_filter,
1013                                                   ptr_array,
1014                                                   NULL);
1015 
1016   /* Detect when someone joins the swarm */
1017   install_match_rule (self,
1018                       "interface='org.freedesktop.DBus',"
1019                       "member='RequestName',"
1020                       "arg0='%s'",
1021                       priv->swarm_name);
1022 
1023   /* Listen for all signals on the Dee interface concerning this swarm */
1024   priv->dbus_signals_id =
1025       g_dbus_connection_signal_subscribe(priv->connection,
1026                                          NULL,                /* sender */
1027                                          DEE_PEER_DBUS_IFACE, /* iface */
1028                                          NULL,                /* member */
1029                                          NULL,                /* object path */
1030                                          priv->swarm_name,    /* arg0 */
1031                                          G_DBUS_SIGNAL_FLAGS_NONE,
1032                                          on_dbus_peer_signal,      /* callback */
1033                                          self,                /* user_data */
1034                                          NULL);               /* user data free */
1035 }
1036 
1037 static void
assume_leadership(DeePeer * self)1038 assume_leadership (DeePeer *self)
1039 {
1040   DeePeerPrivate *priv;
1041 
1042   g_return_if_fail (DEE_IS_PEER (self));
1043 
1044   priv = self->priv;
1045   if (priv->is_swarm_leader)
1046     {
1047       trace_object (self, "Leadership acquired, but we are already leaders");
1048     }
1049   else
1050     {
1051       trace_object (self, "Got swarm leadership");
1052 
1053       /* The first time we become leaders we install a broad match rule
1054        * that triggers any time someone drops off the bus.
1055        * Please note that we don't bother cleaning up that rule in the
1056        * rare case we loose leadership (which only happens if someone
1057        * forcefully grabs the swarm name) */
1058       if (!priv->has_been_leader)
1059         {
1060           install_match_rule (self, "interface='org.freedesktop.DBus',"
1061                                     "member='NameOwnerChanged',"
1062                                     "arg2=''");
1063         }
1064 
1065       priv->is_swarm_leader = TRUE;
1066       priv->has_been_leader = TRUE;
1067 
1068       g_free (priv->swarm_leader);
1069       priv->swarm_leader = g_strdup (priv->own_name);
1070 
1071       /* Emit a Ping so we can do a head count */
1072       emit_ping (self);
1073 
1074       /* Signal emission must be a "tail call"
1075        * because we can in theory be finalized by
1076        * one of the callbacks */
1077       g_object_notify (G_OBJECT (self), "swarm-leader");
1078     }
1079 
1080 }
1081 
1082 /* GDBus callback from the name owner installed with g_bus_own_name().
1083  * Called when losing leadership. */
1084 static void
on_leadership_lost(GDBusConnection * connection,const gchar * name,gpointer user_data)1085 on_leadership_lost (GDBusConnection *connection,
1086                     const gchar     *name,
1087                     gpointer         user_data)
1088 {
1089   DeePeer        *self;
1090   DeePeerPrivate *priv;
1091 
1092   g_return_if_fail (DEE_IS_PEER (user_data));
1093 
1094   self = DEE_PEER (user_data);
1095   priv = self->priv;
1096 
1097   if (priv->is_swarm_leader)
1098     {
1099       /* We signal the change of swarm leadership in on_ping_received(),
1100        * only at that point do we know the unique name of the new leader */
1101       trace_object (self, "Lost swarm leadership");
1102       // FIXME. We ought to remove the Pong match rule, but it's not paramount
1103       priv->is_swarm_leader = FALSE;
1104     }
1105   else
1106     {
1107       trace_object (self, "Did not become leader");
1108     }
1109 
1110   /* If this is the first time we are notified that we are not the leader
1111    * then request a roster from the leader */
1112   if (priv->is_first_name_check)
1113     {
1114       trace_object (self, "Requesting peer roster from leader");
1115       if (priv->list_cancellable)
1116         {
1117           g_cancellable_cancel (priv->list_cancellable);
1118           g_object_unref (priv->list_cancellable);
1119         }
1120       priv->list_cancellable = g_cancellable_new ();
1121       g_dbus_connection_call (priv->connection,
1122                               priv->swarm_name,
1123                               priv->swarm_path,
1124                               DEE_PEER_DBUS_IFACE,
1125                               "List",
1126                               g_variant_new ("()"),
1127                               NULL,                   /* reply type */
1128                               G_DBUS_CALL_FLAGS_NONE,
1129                               -1,
1130                               priv->list_cancellable, /* cancellable */
1131                               on_list_received,       /* callback */
1132                               self);                  /* user_data */
1133       priv->is_first_name_check = FALSE;
1134     }
1135 }
1136 
1137 /* GDBus callback from the name owner installed with g_bus_own_name().
1138  * Called when we become leaders. */
1139 static void
on_leadership_acquired(GDBusConnection * connection,const gchar * name,gpointer user_data)1140 on_leadership_acquired (GDBusConnection *connection,
1141                         const gchar     *name,
1142                         gpointer         user_data)
1143 {
1144   g_return_if_fail (DEE_IS_PEER (user_data));
1145 
1146   assume_leadership (DEE_PEER (user_data));
1147 }
1148 
1149 /* Callback from the GDBus name watcher installed with g_bus_watch_name() */
1150 static void
on_leadership_changed(GDBusConnection * connection,const gchar * name,const gchar * name_owner,gpointer user_data)1151 on_leadership_changed (GDBusConnection *connection,
1152                        const gchar     *name,
1153                        const gchar     *name_owner,
1154                        gpointer         user_data)
1155 {
1156   DeePeer        *self;
1157   DeePeerPrivate *priv;
1158 
1159   g_return_if_fail (DEE_IS_PEER (user_data));
1160 
1161   self = DEE_PEER (user_data);
1162   priv = self->priv;
1163 
1164   /* Don't bother if we already know this leader */
1165   if (g_strcmp0 (priv->swarm_leader, name_owner) == 0)
1166     return;
1167 
1168   /* At this point we assume we have a new leader */
1169   if (g_strcmp0 (priv->own_name, name_owner) == 0)
1170     assume_leadership (self);
1171   else
1172     {
1173       g_free (priv->swarm_leader);
1174       priv->swarm_leader = g_strdup (name_owner);
1175       priv->is_swarm_leader = FALSE;
1176       g_object_notify (G_OBJECT (self), "swarm-leader");
1177     }
1178 }
1179 
1180 /* Callback from gdbus_message_filter() for custom match rules
1181  * Indicates that @peer_address joined the swarm.
1182  * This method is thread safe */
1183 static void
on_join_received(DeePeer * self,const gchar * peer_address)1184 on_join_received (DeePeer     *self,
1185                   const gchar *peer_address)
1186 {
1187   DeePeerPrivate *priv;
1188 
1189   g_return_if_fail (DEE_IS_PEER (self));
1190   g_return_if_fail (peer_address != NULL);
1191 
1192   trace_object (self, "Found peer %s", peer_address);
1193   priv = self->priv;
1194 
1195   /* If the peer is already known it must have tried to acquire the swarm name
1196    * twice...  Just ignore it */
1197   g_mutex_lock (priv->lock);
1198   if (g_hash_table_lookup_extended (priv->peers, peer_address, NULL, NULL))
1199     {
1200       g_mutex_unlock (priv->lock);
1201       return;
1202     }
1203 
1204   g_hash_table_insert (priv->peers, g_strdup (peer_address), NULL);
1205   g_mutex_unlock (priv->lock);
1206 
1207   emit_peer_found (self, peer_address);
1208 }
1209 
1210 /* Callback from _gdbus_message_filter() for custom match rules
1211  * Indicates that @peer_address left the swarm */
1212 static void
on_bye_received(DeePeer * self,const gchar * peer_address)1213 on_bye_received (DeePeer    *self,
1214                  const gchar *peer_address)
1215 {
1216   DeePeerPrivate *priv;
1217   gboolean        removed;
1218 
1219   g_return_if_fail (DEE_IS_PEER (self));
1220   g_return_if_fail (peer_address != NULL);
1221 
1222   trace_object (self, "Bye %s", peer_address);
1223   priv = self->priv;
1224 
1225   g_mutex_lock (priv->lock);
1226   removed = g_hash_table_remove (self->priv->peers, peer_address);
1227   g_mutex_unlock (priv->lock);
1228 
1229   if (removed)
1230     {
1231       trace_object (self, "Leader said Bye to %s", peer_address);
1232       g_signal_emit (self, _peer_signals[PEER_LOST], 0, peer_address);
1233     }
1234   else
1235     {
1236       trace_object (self, "Unknown peer '%s' dropped out of the swarm",
1237                     peer_address);
1238     }
1239 
1240 }
1241 
1242 /* Broadcast a Bye signal to to notify the swarm that someone left.
1243  * Only call this method as swarm leader - that's the contract
1244  * of the Swarm spec.
1245  * This method is thread safe */
1246 static void
emit_bye(DeePeer * self,const gchar * peer_address)1247 emit_bye (DeePeer     *self,
1248           const gchar *peer_address)
1249 {
1250   DeePeerPrivate *priv;
1251 
1252   g_return_if_fail (DEE_IS_PEER (self));
1253   g_return_if_fail (self->priv->is_swarm_leader);
1254   g_return_if_fail (self->priv->connection != NULL);
1255   g_return_if_fail (peer_address != NULL);
1256 
1257   trace_object (self, "Emit Bye(%s)", peer_address);
1258 
1259   g_signal_emit (self, _peer_signals[PEER_LOST], 0, peer_address);
1260 
1261   priv = self->priv;
1262   g_dbus_connection_emit_signal (priv->connection,
1263                                  NULL,                 /* destination */
1264                                  priv->swarm_path,     /* object path */
1265                                  DEE_PEER_DBUS_IFACE,  /* interface */
1266                                  "Bye",                /* signal name */
1267                                  g_variant_new ("(ss)",
1268                                                 priv->swarm_name, peer_address),
1269                                  NULL);                /* error */
1270 }
1271 
1272 /* Timeout started when we receive a Ping. When this timeout triggers
1273  * we do a diff of priv->head_count and priv->peers and emit peer-lost
1274  * as appropriate. We don't need to emit peer-found because that is already
1275  * done in when receiving the Pongs from the peers */
1276 static gboolean
on_head_count_complete(DeePeer * self)1277 on_head_count_complete (DeePeer *self)
1278 {
1279   DeePeerPrivate *priv;
1280   GHashTable     *new_peers;
1281   gpointer        hkey, hval;
1282   GHashTableIter  hiter;
1283   GSList         *iter;
1284 
1285   g_return_val_if_fail (DEE_IS_PEER (self), FALSE);
1286 
1287   priv = self->priv;
1288 
1289   /* First we build a new_peers hash set with the names of the
1290    * head counted peers. Then we diff the old and the new peers
1291    * sets and emit peer-lost appropriately */
1292   new_peers = g_hash_table_new_full (g_str_hash,
1293                                        g_str_equal,
1294                                        (GDestroyNotify) g_free,
1295                                        NULL);
1296 
1297   /* Build new_peers hash set */
1298   iter = priv->head_count;
1299   for (iter = priv->head_count; iter; iter = iter->next)
1300     {
1301       g_hash_table_insert (new_peers, g_strdup (iter->data), NULL);
1302     }
1303 
1304   /* Emit peer-lost and Bye on peers that didn't emit Pong in due time */
1305   g_mutex_lock (priv->lock);
1306   g_hash_table_iter_init (&hiter, priv->peers);
1307   while (g_hash_table_iter_next (&hiter, &hkey, &hval))
1308     {
1309       if (!g_hash_table_lookup_extended (new_peers, hkey, NULL, NULL))
1310         {
1311           if (priv->is_swarm_leader)
1312             emit_bye (self, hkey);
1313           else
1314             g_signal_emit (self, _peer_signals[PEER_LOST], 0, hkey);
1315 
1316         }
1317     }
1318 
1319   /* Swap old and new peers hash sets */
1320   g_hash_table_destroy (priv->peers);
1321   priv->peers = new_peers;
1322   g_mutex_unlock (priv->lock);
1323 
1324   /* Unregister the head count timeout source. And reset head counting mode */
1325   priv->head_count_source = 0;
1326   g_slist_foreach (priv->head_count, (GFunc) g_free, NULL);
1327   g_slist_free (priv->head_count);
1328   priv->head_count = NULL;
1329 
1330   return FALSE;
1331 }
1332 
1333 /* Indicates that @leader_address send a Ping to the swarm to
1334  * initiate a head count */
1335 static void
on_ping_received(DeePeer * self,const gchar * leader_address)1336 on_ping_received (DeePeer    *self,
1337                   const gchar *leader_address)
1338 {
1339   DeePeerPrivate *priv;
1340 
1341   g_return_if_fail (DEE_IS_PEER (self));
1342   g_return_if_fail (leader_address != NULL);
1343 
1344   priv = self->priv;
1345 
1346   trace_object (self, "Got Ping from: %s", leader_address);
1347 
1348   /* When we receive a Ping (and note that the swarm leader will receive its
1349    * own Ping as well) we enter a "head count mode" where we will consider
1350    * all peers that haven't given us a Pong within a certain timeout for lost.
1351    *
1352    * We indicate that we are in head counting mode by setting
1353    * priv->head_count != NULL
1354    */
1355   if (priv->head_count)
1356     {
1357       g_slist_foreach (priv->head_count, (GFunc) g_free, NULL);
1358       g_slist_free (priv->head_count);
1359     }
1360 
1361   priv->head_count = g_slist_prepend (NULL, g_strdup (priv->own_name));
1362   if (priv->head_count_source != 0)
1363     {
1364       /* Reset the timer if we got another ping */
1365       g_source_remove (priv->head_count_source);
1366     }
1367   priv->head_count_source = g_timeout_add (500,
1368                                            (GSourceFunc) on_head_count_complete,
1369                                            self);
1370 
1371   /* The DeePeer protocol says we must respond with a Pong to any Ping */
1372   emit_pong(self);
1373 }
1374 
1375 /* Indicates that @peer_address has emitted a Pong  */
1376 static void
on_pong_received(DeePeer * self,const gchar * peer_address)1377 on_pong_received (DeePeer    *self,
1378                   const gchar *peer_address)
1379 {
1380   DeePeerPrivate *priv;
1381 
1382   g_return_if_fail (DEE_IS_PEER (self));
1383   g_return_if_fail (peer_address != NULL);
1384 
1385   priv = self->priv;
1386   trace_object (self, "Got pong %s", peer_address);
1387 
1388   g_mutex_lock (priv->lock);
1389   if (!g_hash_table_lookup_extended (priv->peers, peer_address, NULL, NULL))
1390     {
1391       g_hash_table_insert (priv->peers, g_strdup (peer_address), NULL);
1392 
1393       emit_peer_found (self, peer_address);
1394     }
1395   g_mutex_unlock (priv->lock);
1396 
1397   /* If we are in head counting mode register this Ping */
1398   if (priv->head_count)
1399     priv->head_count = g_slist_prepend (priv->head_count,
1400                                         g_strdup (peer_address));
1401 }
1402 
1403 static void
on_dbus_peer_signal(GDBusConnection * connection,const gchar * sender_name,const gchar * object_path,const gchar * interface_name,const gchar * signal_name,GVariant * parameters,gpointer user_data)1404 on_dbus_peer_signal (GDBusConnection *connection,
1405                     const gchar      *sender_name,
1406                     const gchar      *object_path,
1407                     const gchar      *interface_name,
1408                     const gchar      *signal_name,
1409                     GVariant         *parameters,
1410                     gpointer          user_data)
1411 {
1412   DeePeer          *self;
1413   gchar            *peer_address = NULL;
1414 
1415   g_return_if_fail (DEE_IS_PEER (user_data));
1416 
1417   self = DEE_PEER (user_data);
1418 
1419   if (g_strcmp0 ("Bye", signal_name) == 0)
1420     {
1421       g_variant_get (parameters, "(ss)", NULL, &peer_address);
1422       on_bye_received (self, peer_address);
1423     }
1424   else if (g_strcmp0 ("Ping", signal_name) == 0)
1425     on_ping_received (self, sender_name);
1426   else if (g_strcmp0 ("Pong", signal_name) == 0)
1427     on_pong_received (self, sender_name);
1428   else
1429     g_critical ("Unexpected signal from peer %s: %s.%s",
1430                 sender_name, interface_name, signal_name);
1431 }
1432 
1433 /* Broadcast a Ping signal to do a head-count on the swarm.
1434  * Only call this method as swarm leader - that's the contract
1435  * of the Swarm spec.
1436  * This method is thread safe */
1437 static void
emit_ping(DeePeer * self)1438 emit_ping (DeePeer    *self)
1439 {
1440   DeePeerPrivate *priv;
1441 
1442   g_return_if_fail (DEE_IS_PEER (self));
1443   g_return_if_fail (self->priv->is_swarm_leader);
1444   g_return_if_fail (self->priv->connection != NULL);
1445 
1446   trace_object (self, "Emit ping");
1447 
1448   priv = self->priv;
1449   g_dbus_connection_emit_signal (priv->connection,
1450                                  NULL,                 /* destination */
1451                                  priv->swarm_path,     /* object path */
1452                                  DEE_PEER_DBUS_IFACE,  /* interface */
1453                                  "Ping",               /* signal name */
1454                                  g_variant_new ("(s)", priv->swarm_name),
1455                                  NULL);                /* error */
1456 }
1457 
1458 /* Broadcast a Pong signal as a response to a Ping.
1459  * This method is thread safe */
1460 static void
emit_pong(DeePeer * self)1461 emit_pong (DeePeer    *self)
1462 {
1463   DeePeerPrivate *priv;
1464 
1465   g_return_if_fail (DEE_IS_PEER (self));
1466   g_return_if_fail (self->priv->connection != NULL);
1467 
1468   trace_object (self, "Emit pong");
1469 
1470   priv = self->priv;
1471   g_dbus_connection_emit_signal (priv->connection,
1472                                  NULL,                 /* destination */
1473                                  priv->swarm_path,     /* object path */
1474                                  DEE_PEER_DBUS_IFACE,  /* interface */
1475                                  "Pong",               /* signal name */
1476                                  g_variant_new ("(s)", priv->swarm_name),
1477                                  NULL);                /* error */
1478 }
1479 
1480 /* Return floating variant of type '(as)' with unique DBus names of all peers.
1481  * This method is thread safe */
1482 static GVariant*
build_peer_list(DeePeer * self)1483 build_peer_list (DeePeer *self)
1484 {
1485   DeePeerPrivate  *priv;
1486   GHashTableIter   iter;
1487   GVariantBuilder  b;
1488   gpointer         key, val;
1489 
1490   g_return_val_if_fail (DEE_IS_PEER (self), FALSE);
1491 
1492   priv = self->priv;
1493 
1494   g_variant_builder_init (&b, G_VARIANT_TYPE ("(as)"));
1495   g_variant_builder_open (&b, G_VARIANT_TYPE ("as"));
1496 
1497   g_mutex_lock (priv->lock);
1498   g_hash_table_iter_init (&iter, priv->peers);
1499   while (g_hash_table_iter_next (&iter, &key, &val))
1500   {
1501     g_variant_builder_add (&b, "s", key);
1502   }
1503   g_mutex_unlock (priv->lock);
1504 
1505   g_variant_builder_close (&b);
1506   return g_variant_builder_end (&b);
1507 }
1508 
1509 /* This method is thread safe */
1510 static gboolean
check_method(GDBusMessage * msg,const gchar * iface,const gchar * member,const gchar * path)1511 check_method (GDBusMessage     *msg,
1512                const gchar      *iface,
1513                const gchar      *member,
1514                const gchar      *path)
1515 {
1516   return msg != NULL &&
1517          G_DBUS_MESSAGE_TYPE_METHOD_CALL == g_dbus_message_get_message_type (msg) &&
1518          (iface == NULL || g_strcmp0 (g_dbus_message_get_interface (msg), iface) == 0) &&
1519          (member == NULL || g_strcmp0 (g_dbus_message_get_member (msg), member) == 0) &&
1520          (path == NULL || g_strcmp0 (g_dbus_message_get_path (msg), path) == 0);
1521 }
1522 
1523 /* This method is thread safe */
1524 static gboolean
check_signal(GDBusMessage * msg,const gchar * iface,const gchar * member,const gchar * path)1525 check_signal (GDBusMessage     *msg,
1526                const gchar      *iface,
1527                const gchar      *member,
1528                const gchar      *path)
1529 {
1530   return msg != NULL &&
1531          G_DBUS_MESSAGE_TYPE_SIGNAL == g_dbus_message_get_message_type (msg) &&
1532          (iface == NULL || g_strcmp0 (g_dbus_message_get_interface (msg), iface) == 0) &&
1533          (member == NULL || g_strcmp0 (g_dbus_message_get_member (msg), member) == 0) &&
1534          (path == NULL || g_strcmp0 (g_dbus_message_get_path (msg), path) == 0);
1535 }
1536 
1537 /* Used to transfer data to the mainloop.
1538  * Use only for good, not evil, and only from gdbus_message_filter() */
1539 static gboolean
transfer_to_mainloop(gpointer * args)1540 transfer_to_mainloop (gpointer *args)
1541 {
1542   GPtrArray *ptr_array;
1543   GWeakRef *weak_ref;
1544   GObject *object;
1545   GFunc cb = (GFunc) args[0];
1546 
1547   ptr_array = (GPtrArray*) args[1];
1548   weak_ref = (GWeakRef*) g_ptr_array_index (ptr_array, 0);
1549 
1550   object = (GObject*) g_weak_ref_get (weak_ref);
1551   if (object != NULL)
1552     {
1553       cb (object, args[2]);
1554       g_object_unref (object);
1555     }
1556 
1557   g_ptr_array_unref (ptr_array);
1558   g_free (args[2]);
1559   g_free (args);
1560 
1561   return FALSE;
1562 }
1563 
1564 /* Callback applied to all incoming DBus messages. We use this to grab
1565  * messages for our match rules and dispatch to the right on_*_received
1566  * function.
1567  * WARNING: This callback is run in the GDBus message handling thread -
1568  *          and NOT in the mainloop! */
1569 static GDBusMessage*
gdbus_message_filter(GDBusConnection * connection,GDBusMessage * msg,gboolean incoming,gpointer user_data)1570 gdbus_message_filter (GDBusConnection *connection,
1571                       GDBusMessage    *msg,
1572                       gboolean         incoming,
1573                       gpointer         user_data)
1574 {
1575   DeePeer          *self;
1576   DeePeerPrivate   *priv;
1577   GVariant         *body;
1578   GDBusMessageType  msg_type;
1579   const gchar      *sender_address;
1580   gpointer         *data;
1581   GPtrArray        *ptr_array;
1582   GWeakRef         *weak_ref;
1583 
1584   ptr_array = (GPtrArray*) user_data;
1585   weak_ref = (GWeakRef*) g_ptr_array_index (ptr_array, 0);
1586   body = g_dbus_message_get_body (msg);
1587   sender_address = g_dbus_message_get_sender (msg);
1588   msg_type = g_dbus_message_get_message_type (msg);
1589 
1590   /* We have no business with outgoing messages */
1591   if (!incoming)
1592     return msg;
1593 
1594   /* We're only interested in method calls and signals */
1595   if (msg_type != G_DBUS_MESSAGE_TYPE_METHOD_CALL &&
1596       msg_type != G_DBUS_MESSAGE_TYPE_SIGNAL)
1597     return msg;
1598 
1599   /*trace ("FILTER: %p", user_data);
1600     trace ("Msg filter: From: %s, Iface: %s, Member: %s",
1601            dbus_message_get_sender (msg),
1602            dbus_message_get_interface (msg),
1603            dbus_message_get_member (msg));*/
1604 
1605   /* Important note: Apps consuming this lib will likely install custom match
1606    *                 rules which will trigger this filter. Hence we must do very
1607    *                 strict matching before we dispatch our methods */
1608 
1609   if (check_method (msg, "org.freedesktop.DBus", "RequestName", NULL) &&
1610       g_strcmp0 (sender_address, g_dbus_connection_get_unique_name (connection)) != 0 &&
1611       body != NULL)
1612     {
1613       gchar *swarm_name;
1614 
1615       self = (DeePeer*) g_weak_ref_get (weak_ref);
1616       if (self == NULL) return msg;
1617       priv = self->priv;
1618 
1619       g_variant_get (body, "(su)", &swarm_name, NULL);
1620       if (g_strcmp0 (swarm_name, priv->swarm_name) == 0)
1621         {
1622           /* Call on_join_received() in the main loop */
1623           data = g_new (gpointer, 3);
1624           data[0] = on_join_received;
1625           data[1] = g_ptr_array_ref (ptr_array);
1626           data[2] = g_strdup (sender_address);
1627           g_idle_add ((GSourceFunc) transfer_to_mainloop, data);
1628         }
1629 
1630       g_object_unref (self);
1631       g_free (swarm_name);
1632     }
1633   else if (check_signal (msg, "org.freedesktop.DBus", "NameOwnerChanged", NULL) && body != NULL)
1634     {
1635       gchar *old_address, *new_address, *peer_address;
1636       gboolean should_emit_bye;
1637 
1638       self = (DeePeer*) g_weak_ref_get (weak_ref);
1639       if (self == NULL) return msg;
1640       priv = self->priv;
1641 
1642       g_variant_get (body, "(sss)", &peer_address, &old_address, &new_address);
1643 
1644       /* Check if a known peer dropped off the bus and emit the Bye signal
1645        * if we are the swarm leaders */
1646       g_mutex_lock (priv->lock);
1647       should_emit_bye = priv->is_swarm_leader &&
1648                         g_strcmp0 (peer_address, old_address) == 0 &&
1649                         g_strcmp0 (new_address, "") == 0 &&
1650                         g_strcmp0 (peer_address, g_dbus_connection_get_unique_name (connection)) != 0 &&
1651                         g_hash_table_lookup_extended (priv->peers,
1652                                                       peer_address,
1653                                                       NULL,
1654                                                       NULL);
1655       g_mutex_unlock (priv->lock);
1656 
1657       if (should_emit_bye)
1658         {
1659           /* Call emit_bye() in the main loop */
1660           data = g_new (gpointer, 3);
1661           data[0] = emit_bye;
1662           data[1] = g_ptr_array_ref (ptr_array);
1663           data[2] = peer_address; // own
1664           g_idle_add ((GSourceFunc) transfer_to_mainloop, data);
1665           peer_address = NULL;
1666         }
1667       g_object_unref (self);
1668       g_free (old_address);
1669       g_free (new_address);
1670       g_free (peer_address);
1671     }
1672   else
1673     {
1674       self = (DeePeer*) g_weak_ref_get (weak_ref);
1675       if (self == NULL) return msg;
1676       priv = self->priv;
1677 
1678       if (check_method (msg, DEE_PEER_DBUS_IFACE, "List", priv->swarm_path))
1679         {
1680           /* We don't want to go through the whole GDBus
1681            * interface/introspection setup just to export the List method.
1682            * We just handle this particular method inline */
1683           GDBusMessage *reply;
1684           reply = g_dbus_message_new_method_reply (msg);
1685           g_dbus_message_set_body (reply, build_peer_list (self));
1686           g_dbus_connection_send_message (connection,
1687                                           reply,
1688                                           G_DBUS_SEND_MESSAGE_FLAGS_NONE,
1689                                           NULL,   /* out serial */
1690                                           NULL);  /* error */
1691           g_object_unref (reply);
1692 
1693           g_object_unref (self);
1694           /* Convince GDBus that we handled this message by returning NULL */
1695           return NULL;
1696         }
1697       g_object_unref (self);
1698     }
1699 
1700   return msg;
1701 }
1702