1 /*
2 * Copyright (C) 2010 Canonical, Ltd.
3 *
4 * This library is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License
6 * version 3.0 as published by the Free Software Foundation.
7 *
8 * This library is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License version 3.0 for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public
14 * License along with this library. If not, see
15 * <http://www.gnu.org/licenses/>.
16 *
17 * Authors:
18 * Neil Jagdish Patel <neil.patel@canonical.com>
19 * Mikkel Kamstrup Erlandsen <mikkel.kamstrup@canonical.com>
20 *
21 */
22 /**
23 * SECTION:dee-peer
24 * @short_description: Finds other objects with the same swarm-name on the bus.
25 * @include: dee.h
26 *
27 * #DeePeer allows you to build objects that can rendevouz on DBus
28 * without the need for an central registration service. Think of it like
29 * peer-to-peer for your application. The DBus session bus will also implicitly
30 * elect a swarm leader - namely the one owning the swarm name on the bus, but
31 * it's up to the consumer of this API to determine whether swarm leadership has
32 * any concrete responsibilities associated.
33 *
34 * Peers find eachother through a well-known "swarm-name", which is a
35 * well known DBus name, such as: org.myapp.MyPeers. Choose a namespaced
36 * name that would not normally be used outside of your program.
37 *
38 * For example:
39 * <informalexample><programlisting>
40 * {
41 * DeePeer *peer;
42 *
43 * peer = g_object_new (DBUS_TYPE_PEER,
44 * "swarm-name", "org.myapp.MyPeers",
45 * NULL);
46 *
47 * g_signal_connect (peer, "peer-found",
48 * G_CALLBACK (on_peer_found), NULL);
49 * g_signal_connect (peer, "peer-lost",
50 * G_CALLBACK (on_peer_lost), NULL);
51 * }
52 * </programlisting></informalexample>
53 */
54
55 #ifdef HAVE_CONFIG_H
56 #include <config.h>
57 #endif
58
59 #include <gio/gio.h>
60
61 #include "dee-peer.h"
62 #include "dee-marshal.h"
63 #include "trace-log.h"
64
65 #define _DeePeerIter GSequenceIter
66
67 /**
68 * DeePeerPrivate:
69 *
70 * Ignore this structure.
71 **/
72 struct _DeePeerPrivate
73 {
74 GDBusConnection *connection;
75
76 /* Used as a hash set with the unique addresses of the peers.
77 * This hash table must be protected from concurrent access,
78 * since it's used in the GDBus message dispatch thread */
79 GHashTable *peers;
80
81 /* A List with the string-formatted match rules we've installed */
82 GSList *match_rules;
83
84 /* The GDBus filter id, so we can uninstall our message filter again */
85 guint filter_id;
86
87 /* GDBus id for the DBus signal subscriptions */
88 guint dbus_signals_id;
89
90 /* The GDBus name owner id for g_bus_own_name() */
91 guint name_owner_id;
92
93 /* The GDBus name watcher id from g_bus_watch_name() */
94 guint name_watcher_id;
95
96 /* Swarm related properties */
97 gboolean swarm_owner;
98 const gchar *own_name;
99 gchar *swarm_name;
100 gchar *swarm_path;
101 gchar *swarm_leader;
102
103 gboolean connected;
104 gboolean is_swarm_leader;
105 gboolean has_been_leader;
106 gboolean is_first_name_check;
107
108 GCancellable *list_cancellable;
109
110 /* if priv->head_count != NULL it indicates that we are in
111 * "head counting mode" in which case priv->head_count_source will be a
112 * GSource id for a timeout that completes the head count */
113 GSList *head_count;
114 guint head_count_source;
115
116 /* Protecting the priv->peers table from concurrent access in
117 * the GDBus message dispatch thread */
118 #if GLIB_CHECK_VERSION(2, 31, 16)
119 GMutex lock_real;
120 #endif
121 GMutex *lock;
122 };
123
124 /* Globals */
125 enum
126 {
127 PROP_0,
128 PROP_SWARM_NAME,
129 PROP_SWARM_LEADER,
130 PROP_SWARM_OWNER
131 };
132
133 enum
134 {
135 PEER_FOUND,
136 PEER_LOST,
137 CONNECTION_ACQUIRED,
138 CONNECTION_CLOSED,
139
140 LAST_SIGNAL
141 };
142
143 G_DEFINE_TYPE_WITH_PRIVATE (DeePeer, dee_peer, G_TYPE_OBJECT)
144
145 static guint32 _peer_signals[LAST_SIGNAL] = { 0 };
146
147 /* Forwards */
148 static void remove_match_rule (GDBusConnection *conn,
149 const gchar *rule);
150
151 static void emit_peer_found (DeePeer *self,
152 const gchar *name);
153
154 static void on_bus_acquired (GDBusConnection *connection,
155 const gchar *name,
156 gpointer user_data);
157
158 static void on_leadership_lost (GDBusConnection *connection,
159 const gchar *name,
160 gpointer user_data);
161
162 static void on_leadership_acquired (GDBusConnection *connection,
163 const gchar *name,
164 gpointer user_data);
165
166 static void on_leadership_changed (GDBusConnection *connection,
167 const gchar *name,
168 const gchar *name_owner,
169 gpointer user_data);
170
171 static void on_join_received (DeePeer *self,
172 const gchar *peer_address);
173
174 static void on_bye_received (DeePeer *self,
175 const gchar *peer_address);
176
177 static void on_ping_received (DeePeer *self,
178 const gchar *leader_address);
179
180 static void on_pong_received (DeePeer *self,
181 const gchar *peer_address);
182
183 static void on_list_received (GObject *source_object,
184 GAsyncResult *res,
185 gpointer user_data);
186
187 static void set_swarm_name (DeePeer *self,
188 const gchar *swarm_name);
189
190 static void emit_ping (DeePeer *self);
191
192 static void emit_pong (DeePeer *self);
193
194 static void on_dbus_peer_signal (GDBusConnection *connection,
195 const gchar *sender_name,
196 const gchar *object_path,
197 const gchar *interface_name,
198 const gchar *signal_name,
199 GVariant *parameters,
200 gpointer user_data);
201
202 static GDBusMessage* gdbus_message_filter (GDBusConnection *connection,
203 GDBusMessage *message,
204 gboolean incoming,
205 gpointer user_data);
206
207 static const gchar* dee_peer_real_get_swarm_leader (DeePeer *self);
208
209 static gboolean dee_peer_real_is_swarm_leader (DeePeer *self);
210
211 static GSList* dee_peer_real_get_connections (DeePeer *self);
212
213 static gchar** dee_peer_real_list_peers (DeePeer *self);
214
215 /* GObject methods */
216 static void
dee_peer_dispose(GObject * object)217 dee_peer_dispose (GObject *object)
218 {
219 DeePeerPrivate *priv;
220 GSList *match_iter;
221
222 priv = DEE_PEER (object)->priv;
223
224 /* Remove match rules from the bus, and free the string repr. of the rule */
225 if (priv->connection)
226 {
227 /* Uninstall filter.
228 * Implementation note: We must remove the filter and signal listener
229 * _before_ dropping the swarm name because gdbus currently does a
230 * sync dbus call to release the name which makes us race against
231 * getting a NameOwnerChanged */
232 /* The removal of the filter rules also has to be done in dispose,
233 * not finalize, cause the filter callback can ref this object and
234 * therefore postpone finalization, although that shouldn't happen,
235 * as this uses locking and the call will not return until all current
236 * invocations of the filter callback finish. */
237 g_dbus_connection_remove_filter (priv->connection, priv->filter_id);
238
239 for (match_iter = priv->match_rules;
240 match_iter != NULL;
241 match_iter = match_iter->next)
242 {
243 remove_match_rule (priv->connection, match_iter->data);
244 g_free (match_iter->data);
245 }
246
247 /* Stop listening for signals */
248 if (priv->dbus_signals_id != 0)
249 {
250 g_dbus_connection_signal_unsubscribe (priv->connection,
251 priv->dbus_signals_id);
252 priv->dbus_signals_id = 0;
253 }
254
255 g_object_unref (priv->connection);
256 priv->connection = NULL;
257 }
258
259 if (priv->match_rules)
260 {
261 g_slist_free (priv->match_rules);
262 priv->match_rules = NULL;
263 }
264
265 /* Stop trying to own the swarm name.
266 * See implementation note above */
267 if (priv->name_owner_id != 0)
268 {
269 g_bus_unown_name (priv->name_owner_id);
270 priv->name_owner_id = 0;
271 }
272
273 /* Stop listening for swarm leadership changes */
274 if (priv->name_watcher_id != 0)
275 {
276 g_bus_unwatch_name (priv->name_watcher_id);
277 priv->name_watcher_id = 0;
278 }
279
280 G_OBJECT_CLASS (dee_peer_parent_class)->dispose (object);
281 }
282
283 static void
dee_peer_finalize(GObject * object)284 dee_peer_finalize (GObject *object)
285 {
286 DeePeerPrivate *priv;
287
288 priv = DEE_PEER (object)->priv;
289
290 if (priv->list_cancellable != NULL)
291 {
292 g_cancellable_cancel (priv->list_cancellable);
293 g_object_unref (priv->list_cancellable);
294 priv->list_cancellable = NULL;
295 }
296
297 /* Free resources */
298 if (priv->swarm_name)
299 {
300 g_free (priv->swarm_name);
301 priv->swarm_name = NULL;
302 }
303 if (priv->swarm_path)
304 {
305 g_free (priv->swarm_path);
306 priv->swarm_path = NULL;
307 }
308 if (priv->swarm_leader)
309 {
310 g_free (priv->swarm_leader);
311 priv->swarm_leader = NULL;
312 }
313 if (priv->peers)
314 {
315 g_hash_table_destroy (priv->peers);
316 priv->peers = NULL;
317 }
318 if (priv->lock != NULL)
319 {
320 #if GLIB_CHECK_VERSION(2, 31, 16)
321 g_mutex_clear (priv->lock);
322 #else
323 g_mutex_free (priv->lock);
324 #endif
325 priv->lock = NULL;
326 }
327 if (priv->head_count != NULL)
328 {
329 g_slist_foreach(priv->head_count, (GFunc) g_free, NULL);
330 g_slist_free (priv->head_count);
331 priv->head_count = NULL;
332 }
333 if (priv->head_count_source != 0)
334 {
335 g_source_remove (priv->head_count_source);
336 priv->head_count_source = 0;
337 }
338
339 G_OBJECT_CLASS (dee_peer_parent_class)->finalize (object);
340 }
341
342 static void
dee_peer_constructed(GObject * self)343 dee_peer_constructed (GObject *self)
344 {
345 DeePeerPrivate *priv;
346 GBusNameOwnerFlags flags;
347
348 priv = DEE_PEER (self)->priv;
349
350 if (priv->swarm_name == NULL)
351 {
352 g_critical ("DeePeer created without a swarm name. You must specify "
353 "a non-NULL swarm name");
354 return;
355 }
356
357 /* Contend to be swarm leaders. Pick me! Pick me! */
358 flags = priv->swarm_owner ?
359 G_BUS_NAME_OWNER_FLAGS_REPLACE : G_BUS_NAME_OWNER_FLAGS_ALLOW_REPLACEMENT;
360
361 priv->name_owner_id = g_bus_own_name (G_BUS_TYPE_SESSION,
362 priv->swarm_name, /* name to own */
363 flags,
364 on_bus_acquired,
365 on_leadership_acquired,
366 on_leadership_lost,
367 self, /* user data */
368 NULL); /* free func */
369
370 /* Listen for changes in leadership */
371 priv->name_watcher_id = g_bus_watch_name(G_BUS_TYPE_SESSION,
372 priv->swarm_name,
373 G_BUS_NAME_WATCHER_FLAGS_NONE,
374 on_leadership_changed,
375 NULL, /* name vanished cb */
376 self, /* user data */
377 NULL); /* user data free func */
378 }
379
380
381 static void
dee_peer_set_property(GObject * object,guint id,const GValue * value,GParamSpec * pspec)382 dee_peer_set_property (GObject *object,
383 guint id,
384 const GValue *value,
385 GParamSpec *pspec)
386 {
387 DeePeerPrivate *priv = DEE_PEER (object)->priv;
388
389
390 switch (id)
391 {
392 case PROP_SWARM_NAME:
393 set_swarm_name (DEE_PEER (object), g_value_get_string (value));
394 break;
395 case PROP_SWARM_LEADER:
396 g_free (priv->swarm_leader);
397 priv->swarm_leader = g_value_dup_string (value);
398 break;
399 case PROP_SWARM_OWNER:
400 priv->swarm_owner = g_value_get_boolean (value);
401 break;
402 default:
403 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, id, pspec);
404 break;
405 }
406 }
407
408 static void
dee_peer_get_property(GObject * object,guint id,GValue * value,GParamSpec * pspec)409 dee_peer_get_property (GObject *object,
410 guint id,
411 GValue *value,
412 GParamSpec *pspec)
413 {
414 switch (id)
415 {
416 case PROP_SWARM_NAME:
417 g_value_set_string (value, DEE_PEER (object)->priv->swarm_name);
418 break;
419 case PROP_SWARM_LEADER:
420 g_value_set_string (value, dee_peer_get_swarm_leader (DEE_PEER (object)));
421 break;
422 case PROP_SWARM_OWNER:
423 g_value_set_boolean (value, DEE_PEER (object)->priv->swarm_owner);
424 break;
425 default:
426 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, id, pspec);
427 break;
428 }
429 }
430
431 static void
dee_peer_class_init(DeePeerClass * klass)432 dee_peer_class_init (DeePeerClass *klass)
433 {
434 GObjectClass *obj_class = G_OBJECT_CLASS (klass);
435 GParamSpec *pspec;
436
437 obj_class->dispose = dee_peer_dispose;
438 obj_class->finalize = dee_peer_finalize;
439 obj_class->set_property = dee_peer_set_property;
440 obj_class->get_property = dee_peer_get_property;
441 obj_class->constructed = dee_peer_constructed;
442
443 /* Virtual methods */
444 klass->get_swarm_leader = dee_peer_real_get_swarm_leader;
445 klass->is_swarm_leader = dee_peer_real_is_swarm_leader;
446 klass->get_connections = dee_peer_real_get_connections;
447 klass->list_peers = dee_peer_real_list_peers;
448
449 /* Add Signals */
450
451 /**
452 * DeePeer::peer-found:
453 * @self: the #DeePeer on which the signal is emitted
454 * @name: the DBus name of the object found
455 *
456 * Connect to this signal to be notified of existing and new peers that are
457 * in your swarm.
458 **/
459 _peer_signals[PEER_FOUND] =
460 g_signal_new ("peer-found",
461 G_TYPE_FROM_CLASS (klass),
462 G_SIGNAL_RUN_LAST,
463 G_STRUCT_OFFSET (DeePeerClass, peer_found),
464 NULL, NULL,
465 g_cclosure_marshal_VOID__STRING,
466 G_TYPE_NONE, 1,
467 G_TYPE_STRING);
468
469 /**
470 * DeePeer::peer-lost:
471 * @self: the #DeePeer on which the signal is emitted
472 * @name: the DBus name of the object that disconnected
473 *
474 * Connect to this signal to be notified when peers disconnect from the swarm
475 **/
476 _peer_signals[PEER_LOST] =
477 g_signal_new ("peer-lost",
478 G_TYPE_FROM_CLASS (klass),
479 G_SIGNAL_RUN_LAST,
480 G_STRUCT_OFFSET (DeePeerClass, peer_lost),
481 NULL, NULL,
482 g_cclosure_marshal_VOID__STRING,
483 G_TYPE_NONE, 1,
484 G_TYPE_STRING);
485
486 /**
487 * DeePeer::new-connection:
488 * @self: the #DeePeer on which the signal is emitted
489 * @connection: the new #GDBusConnection
490 *
491 * Connect to this signal to be notified when peers connect via
492 * new #GDBusConnection.
493 **/
494 _peer_signals[CONNECTION_ACQUIRED] =
495 g_signal_new ("connection-acquired",
496 G_TYPE_FROM_CLASS (klass),
497 G_SIGNAL_RUN_LAST,
498 G_STRUCT_OFFSET (DeePeerClass, connection_acquired),
499 NULL, NULL,
500 g_cclosure_marshal_VOID__OBJECT,
501 G_TYPE_NONE, 1,
502 G_TYPE_DBUS_CONNECTION);
503
504 /**
505 * DeePeer::connection-closed:
506 * @self: the #DeePeer on which the signal is emitted
507 * @connection: the closed #GDBusConnection
508 *
509 * Connect to this signal to be notified when peers close
510 * their #GDBusConnection.
511 **/
512 _peer_signals[CONNECTION_CLOSED] =
513 g_signal_new ("connection-closed",
514 G_TYPE_FROM_CLASS (klass),
515 G_SIGNAL_RUN_LAST,
516 G_STRUCT_OFFSET (DeePeerClass, connection_closed),
517 NULL, NULL,
518 g_cclosure_marshal_VOID__OBJECT,
519 G_TYPE_NONE, 1,
520 G_TYPE_DBUS_CONNECTION);
521
522 /* Add properties */
523 /**
524 * DeePeer::swarm-name:
525 *
526 * The name of the swarm that this peer is connected to. All swarm members
527 * will try and own this name on the session bus. The one owning the name
528 * is the swarm leader.
529 */
530 pspec = g_param_spec_string ("swarm-name", "Swarm Name",
531 "Well-known name to find other peers with",
532 NULL,
533 G_PARAM_READWRITE | G_PARAM_CONSTRUCT
534 | G_PARAM_STATIC_STRINGS);
535 g_object_class_install_property (obj_class, PROP_SWARM_NAME, pspec);
536
537 /**
538 * DeePeer::swarm-leader:
539 *
540 * The name of the swarm that this peer is connected to. All swarm members
541 * will try and own this name on the session bus. The one owning the name
542 * is the swarm leader.
543 **/
544 pspec = g_param_spec_string ("swarm-leader", "Swarm Leader",
545 "Unique DBus address of the swarm leader",
546 NULL,
547 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
548 g_object_class_install_property (obj_class, PROP_SWARM_LEADER, pspec);
549
550 /**
551 * DeePeer::swarm-owner:
552 *
553 * If set, this peer will try to become a leader of the swarm.
554 *
555 * Creating a #DeeSharedModel with a peer that successfully assumes ownership
556 * of a swarm will skip cloning of the model, therefore you need to set
557 * the schema and fill the model with data yourself.
558 *
559 * Setting this property to TRUE does NOT guarantee that this peer will
560 * become a leader. You should always check the :swarm-leader property.
561 **/
562 pspec = g_param_spec_boolean ("swarm-owner", "Swarm Owner",
563 "Try to assume leadership of the swarm",
564 FALSE,
565 G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY
566 | G_PARAM_STATIC_STRINGS);
567 g_object_class_install_property (obj_class, PROP_SWARM_OWNER, pspec);
568 }
569
570 static void
dee_peer_init(DeePeer * peer)571 dee_peer_init (DeePeer *peer)
572 {
573 DeePeerPrivate *priv;
574
575 priv = peer->priv = dee_peer_get_instance_private (peer);
576
577 priv->swarm_name = NULL;
578 priv->swarm_leader = NULL;
579 priv->own_name = NULL;
580 priv->match_rules = NULL;
581 priv->peers = g_hash_table_new_full (g_str_hash,
582 g_str_equal,
583 (GDestroyNotify) g_free,
584 NULL);
585
586 priv->connected = FALSE;
587 priv->is_swarm_leader = FALSE;
588 priv->has_been_leader = FALSE;
589 priv->is_first_name_check = TRUE;
590
591 priv->list_cancellable = NULL;
592
593 #if GLIB_CHECK_VERSION(2, 31, 16)
594 g_mutex_init (&priv->lock_real);
595 priv->lock = &priv->lock_real;
596 #else
597 priv->lock = g_mutex_new ();
598 #endif
599
600 priv->head_count_source = 0;
601 }
602
603 /* Private Methods */
604
605
606 /* Async callback for com.canonical.Dee.Peer.List */
607 static void
on_list_received(GObject * source_object,GAsyncResult * res,gpointer user_data)608 on_list_received (GObject *source_object,
609 GAsyncResult *res,
610 gpointer user_data)
611 {
612 DeePeer *self;
613 DeePeerPrivate *priv;
614 GHashTable *peers, *old_peers_ht;
615 GSList *new_peers, *iter;
616 guint i;
617 GVariant *val, *_val;
618 const gchar **names;
619 gsize n_names;
620 GError *error;
621 GHashTableIter hiter;
622 gpointer hkey, hval;
623
624 error = NULL;
625 _val = g_dbus_connection_call_finish (G_DBUS_CONNECTION (source_object),
626 res, &error);
627 if (error != NULL)
628 {
629 if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
630 {
631 g_warning ("%s: Unable to list peers: %s", G_STRLOC, error->message);
632 }
633 g_error_free (error);
634 return;
635 }
636
637 g_return_if_fail (DEE_IS_PEER (user_data));
638 self = DEE_PEER (user_data);
639 priv = self->priv;
640
641 /* Unpack the wrapping struct from the reply */
642 val = g_variant_get_child_value (_val, 0);
643 g_variant_unref (_val);
644
645 names = g_variant_get_strv (val, &n_names);
646 trace_object (self, "Got list of %d peers", n_names);
647
648 /* We diff the current list of peers against the new list
649 * and emit signals accordingly. New peers are added to new_peers,
650 * and lost peers will remain in priv->peers: */
651 new_peers = NULL;
652 peers = g_hash_table_new_full (g_str_hash,
653 g_str_equal,
654 (GDestroyNotify)g_free,
655 NULL);
656
657 g_mutex_lock (priv->lock);
658 for (i = 0; i < n_names; i++)
659 {
660 g_hash_table_insert (peers, g_strdup (names[i]), NULL);
661 if (!g_hash_table_remove (priv->peers, names[i]))
662 {
663 /* The peer was not previously known */
664 new_peers = g_slist_prepend (new_peers, (gchar *) names[i]);
665 }
666 }
667
668 /* Signal about lost peers */
669 g_hash_table_iter_init (&hiter, priv->peers);
670 while (g_hash_table_iter_next (&hiter, &hkey, &hval))
671 {
672 g_signal_emit (self, _peer_signals[PEER_LOST], 0, hkey);
673 }
674
675 old_peers_ht = priv->peers;
676 priv->peers = peers;
677 g_mutex_unlock (priv->lock);
678
679 /* Signal about new peers */
680 for (iter = new_peers; iter; iter = iter->next)
681 {
682 emit_peer_found (self, (const gchar*)iter->data);
683 }
684
685 /* The return value of g_variant_get_strv() is a shallow copy */
686 g_free (names);
687 g_variant_unref (val);
688
689 /* Free just the array, not the strings - they are owned by 'peers' now */
690 g_slist_free (new_peers);
691 g_hash_table_destroy (old_peers_ht);
692 }
693
694 /* Install a DBus match rule, async, described by a printf-like format string.
695 * The match rule will be properly retracted when @self is finalized */
696 static void
install_match_rule(DeePeer * self,const char * rule,...)697 install_match_rule (DeePeer *self, const char *rule, ...)
698 {
699 DeePeerPrivate *priv;
700 gchar *f_rule;
701 va_list args;
702
703 g_return_if_fail (DEE_IS_PEER (self));
704 g_return_if_fail (rule != NULL);
705
706 priv = self->priv;
707
708 va_start (args, rule);
709 f_rule = g_strdup_vprintf (rule, args);
710 va_end (args);
711
712 /* By setting the error argument to NULL libdbus will use async mode
713 * for adding the match rule. We want that. */
714 g_dbus_connection_call (priv->connection,
715 "org.freedesktop.DBus",
716 "/org/freedesktop/dbus",
717 "org.freedesktop.DBus",
718 "AddMatch",
719 g_variant_new ("(s)", f_rule),
720 NULL, /* reply type */
721 G_DBUS_CALL_FLAGS_NONE,
722 -1,
723 NULL, /* cancellable */
724 NULL, /* callback */
725 NULL); /* user_data */
726
727 priv->match_rules = g_slist_prepend (priv->match_rules, f_rule);
728 }
729
730 static void
remove_match_rule(GDBusConnection * conn,const gchar * rule)731 remove_match_rule (GDBusConnection *conn, const gchar *rule)
732 {
733 g_dbus_connection_call (conn,
734 "org.freedesktop.DBus",
735 "/org/freedesktop/dbus",
736 "org.freedesktop.DBus",
737 "RemoveMatch",
738 g_variant_new ("(s)", rule),
739 NULL, /* reply type */
740 G_DBUS_CALL_FLAGS_NONE,
741 -1,
742 NULL, /* cancellable */
743 NULL, /* callback */
744 NULL); /* user_data */
745 }
746
747 static const gchar*
dee_peer_real_get_swarm_leader(DeePeer * self)748 dee_peer_real_get_swarm_leader (DeePeer *self)
749 {
750 return self->priv->swarm_leader;
751 }
752
753 static gboolean
dee_peer_real_is_swarm_leader(DeePeer * self)754 dee_peer_real_is_swarm_leader (DeePeer *self)
755 {
756 return self->priv->is_swarm_leader;
757 }
758
759 static GSList*
dee_peer_real_get_connections(DeePeer * self)760 dee_peer_real_get_connections (DeePeer *self)
761 {
762 GSList *list = NULL;
763
764 if (self->priv->connection)
765 {
766 list = g_slist_append (list, self->priv->connection);
767 }
768
769 return list;
770 }
771
772 static gchar**
dee_peer_real_list_peers(DeePeer * self)773 dee_peer_real_list_peers (DeePeer *self)
774 {
775 DeePeerPrivate *priv;
776 GHashTableIter iter;
777 gpointer key, value;
778 gchar **result;
779 int i;
780
781 priv = self->priv;
782 i = 0;
783
784 g_mutex_lock (priv->lock);
785 result = g_new (gchar*, g_hash_table_size (priv->peers) + 1);
786 g_hash_table_iter_init (&iter, priv->peers);
787 while (g_hash_table_iter_next (&iter, &key, &value))
788 {
789 result[i++] = g_strdup ((gchar*) key);
790 }
791 g_mutex_unlock (priv->lock);
792
793 result[i] = NULL;
794
795 return result;
796 }
797
798 /* Public Methods */
799
800 /**
801 * dee_peer_new:
802 * @swarm_name: The name of the swarm to join.
803 * Fx "org.example.DataProviders"
804 *
805 * Create a new #DeePeer. The peer will immediately connect to the swarm
806 * and start the peer discovery.
807 *
808 * Return value: (transfer full): A newly constructed #DeePeer.
809 * Free with g_object_unref().
810 */
811 DeePeer*
dee_peer_new(const gchar * swarm_name)812 dee_peer_new (const gchar* swarm_name)
813 {
814 g_return_val_if_fail (swarm_name != NULL, NULL);
815
816 return g_object_new (DEE_TYPE_PEER, "swarm-name", swarm_name, NULL);
817 }
818
819 /**
820 * dee_peer_is_swarm_leader:
821 * @self: a #DeePeer
822 *
823 * Return value: %TRUE if and only if this peer owns the swarm name on
824 * the session bus
825 */
826 gboolean
dee_peer_is_swarm_leader(DeePeer * self)827 dee_peer_is_swarm_leader (DeePeer *self)
828 {
829 g_return_val_if_fail (DEE_IS_PEER (self), FALSE);
830
831 DeePeerClass *klass = DEE_PEER_GET_CLASS (self);
832 return klass->is_swarm_leader (self);
833 }
834
835 /**
836 * dee_peer_get_swarm_leader:
837 * @self: a #DeePeer
838 *
839 * In case this peer is connected to a message bus, gets the unique DBus
840 * address of the current swarm leader, otherwise returns id of the leader.
841 *
842 * Return value: Unique DBus address of the current swarm leader,
843 * possibly %NULL if the leader has not been detected yet
844 */
845 const gchar*
dee_peer_get_swarm_leader(DeePeer * self)846 dee_peer_get_swarm_leader (DeePeer *self)
847 {
848 g_return_val_if_fail (DEE_IS_PEER (self), NULL);
849
850 DeePeerClass *klass = DEE_PEER_GET_CLASS (self);
851 return klass->get_swarm_leader (self);
852 }
853
854 /**
855 * dee_peer_get_swarm_name:
856 * @self: a #DeePeer
857 *
858 * Gets the unique name for this swarm. The swarm leader is the Peer owning
859 * this name on the session bus.
860 *
861 * Return value: The swarm name
862 **/
863 const gchar*
dee_peer_get_swarm_name(DeePeer * self)864 dee_peer_get_swarm_name (DeePeer *self)
865 {
866 g_return_val_if_fail (DEE_IS_PEER (self), NULL);
867
868 return self->priv->swarm_name;
869 }
870
871 /**
872 * dee_peer_get_connections:
873 * @self: a #DeePeer
874 *
875 * Gets list of #GDBusConnection instances used by this #DeePeer instance.
876 *
877 * Return value: (transfer container) (element-type Gio.DBusConnection):
878 * List of connections.
879 */
880 GSList*
dee_peer_get_connections(DeePeer * self)881 dee_peer_get_connections (DeePeer *self)
882 {
883 g_return_val_if_fail (DEE_IS_PEER (self), NULL);
884
885 DeePeerClass *klass = DEE_PEER_GET_CLASS (self);
886 return klass->get_connections (self);
887 }
888
889 /**
890 * dee_peer_list_peers:
891 * @self: a #DeePeer
892 *
893 * Gets list of all peers currently in this swarm.
894 *
895 * Return value: (transfer full): List of peers (free using g_strfreev()).
896 */
897 gchar**
dee_peer_list_peers(DeePeer * self)898 dee_peer_list_peers (DeePeer *self)
899 {
900 g_return_val_if_fail (DEE_IS_PEER (self), NULL);
901
902 DeePeerClass *klass = DEE_PEER_GET_CLASS (self);
903 return klass->list_peers (self);
904 }
905
906 /**
907 * dee_peer_is_swarm_owner:
908 * @self: a #DeePeer
909 *
910 * Gets the value of the :swarm-owner property.
911 *
912 * Note that this does NOT mean that the peer is leader of the swarm! Check also
913 * dee_peer_is_swarm_leader().
914 *
915 * Return value: TRUE if the :swarm-owner property was set during construction.
916 */
917 gboolean
dee_peer_is_swarm_owner(DeePeer * self)918 dee_peer_is_swarm_owner (DeePeer *self)
919 {
920 g_return_val_if_fail (DEE_IS_PEER (self), FALSE);
921
922 return self->priv->swarm_owner;
923 }
924
925 static void
emit_peer_found(DeePeer * self,const gchar * name)926 emit_peer_found (DeePeer *self,
927 const gchar *name)
928 {
929 DeePeerPrivate *priv;
930
931 g_return_if_fail (DEE_IS_PEER(self));
932 g_return_if_fail (name != NULL);
933
934 priv = self->priv;
935
936 if (!g_str_equal (name, priv->own_name))
937 {
938 g_signal_emit (self, _peer_signals[PEER_FOUND], 0, name);
939 }
940 }
941
942 static void
set_swarm_name(DeePeer * self,const gchar * swarm_name)943 set_swarm_name (DeePeer *self,
944 const gchar *swarm_name)
945 {
946 DeePeerPrivate *priv;
947 gchar *dummy;
948
949 g_return_if_fail (DEE_IS_PEER (self));
950 g_return_if_fail (swarm_name != NULL);
951 priv = self->priv;
952
953 if (priv->swarm_name)
954 {
955 g_warning ("%s: Unable to set previously set swarm_name (%s) to (%s)",
956 G_STRLOC,
957 priv->swarm_name,
958 swarm_name);
959 return;
960 }
961
962 /* If swarm_name is org.example.MyService then the swarm_path will
963 * become /com/canonical/dee/org/example/MyService. Note that
964 * the actual object path of the peer is not used in the Swarm spec */
965
966 priv->swarm_name = g_strdup (swarm_name);
967 dummy = g_strdelimit (g_strdup(swarm_name), ".", '/');
968 priv->swarm_path = g_strdup_printf ("/com/canonical/dee/peer/%s", dummy);
969
970 g_free (dummy);
971 }
972
973 static void
dispose_weak_ref(gpointer data)974 dispose_weak_ref (gpointer data)
975 {
976 GWeakRef *weak_ref = (GWeakRef*) data;
977 g_weak_ref_clear (weak_ref);
978 g_free (data);
979 }
980
981 /* Called when we get the bus connection the first time. */
982 static void
on_bus_acquired(GDBusConnection * connection,const gchar * name,gpointer user_data)983 on_bus_acquired (GDBusConnection *connection,
984 const gchar *name,
985 gpointer user_data)
986 {
987 DeePeer *self;
988 DeePeerPrivate *priv;
989 GWeakRef *weak_ref;
990 GPtrArray *ptr_array;
991
992 g_return_if_fail (DEE_IS_PEER (user_data));
993
994 self = DEE_PEER (user_data);
995 priv = self->priv;
996 priv->connection = g_object_ref (connection);
997 priv->own_name = g_strdup (g_dbus_connection_get_unique_name (connection));
998
999 g_signal_emit (self, _peer_signals[CONNECTION_ACQUIRED], 0, priv->connection);
1000
1001 /* Using GPtrArray as a ref-count container for the weak ref */
1002 weak_ref = (GWeakRef*) g_new (GWeakRef, 1);
1003 g_weak_ref_init (weak_ref, self);
1004 ptr_array = g_ptr_array_new_full (1, dispose_weak_ref);
1005 g_ptr_array_add (ptr_array, weak_ref);
1006
1007 /* FIXME: the last param should be g_ptr_array_unref, but there's a bug
1008 * in gio that can cause a crash, we'll rather have a small leak than
1009 * random crashes.
1010 * https://bugzilla.gnome.org/show_bug.cgi?id=704568 */
1011 priv->filter_id = g_dbus_connection_add_filter (priv->connection,
1012 gdbus_message_filter,
1013 ptr_array,
1014 NULL);
1015
1016 /* Detect when someone joins the swarm */
1017 install_match_rule (self,
1018 "interface='org.freedesktop.DBus',"
1019 "member='RequestName',"
1020 "arg0='%s'",
1021 priv->swarm_name);
1022
1023 /* Listen for all signals on the Dee interface concerning this swarm */
1024 priv->dbus_signals_id =
1025 g_dbus_connection_signal_subscribe(priv->connection,
1026 NULL, /* sender */
1027 DEE_PEER_DBUS_IFACE, /* iface */
1028 NULL, /* member */
1029 NULL, /* object path */
1030 priv->swarm_name, /* arg0 */
1031 G_DBUS_SIGNAL_FLAGS_NONE,
1032 on_dbus_peer_signal, /* callback */
1033 self, /* user_data */
1034 NULL); /* user data free */
1035 }
1036
1037 static void
assume_leadership(DeePeer * self)1038 assume_leadership (DeePeer *self)
1039 {
1040 DeePeerPrivate *priv;
1041
1042 g_return_if_fail (DEE_IS_PEER (self));
1043
1044 priv = self->priv;
1045 if (priv->is_swarm_leader)
1046 {
1047 trace_object (self, "Leadership acquired, but we are already leaders");
1048 }
1049 else
1050 {
1051 trace_object (self, "Got swarm leadership");
1052
1053 /* The first time we become leaders we install a broad match rule
1054 * that triggers any time someone drops off the bus.
1055 * Please note that we don't bother cleaning up that rule in the
1056 * rare case we loose leadership (which only happens if someone
1057 * forcefully grabs the swarm name) */
1058 if (!priv->has_been_leader)
1059 {
1060 install_match_rule (self, "interface='org.freedesktop.DBus',"
1061 "member='NameOwnerChanged',"
1062 "arg2=''");
1063 }
1064
1065 priv->is_swarm_leader = TRUE;
1066 priv->has_been_leader = TRUE;
1067
1068 g_free (priv->swarm_leader);
1069 priv->swarm_leader = g_strdup (priv->own_name);
1070
1071 /* Emit a Ping so we can do a head count */
1072 emit_ping (self);
1073
1074 /* Signal emission must be a "tail call"
1075 * because we can in theory be finalized by
1076 * one of the callbacks */
1077 g_object_notify (G_OBJECT (self), "swarm-leader");
1078 }
1079
1080 }
1081
1082 /* GDBus callback from the name owner installed with g_bus_own_name().
1083 * Called when losing leadership. */
1084 static void
on_leadership_lost(GDBusConnection * connection,const gchar * name,gpointer user_data)1085 on_leadership_lost (GDBusConnection *connection,
1086 const gchar *name,
1087 gpointer user_data)
1088 {
1089 DeePeer *self;
1090 DeePeerPrivate *priv;
1091
1092 g_return_if_fail (DEE_IS_PEER (user_data));
1093
1094 self = DEE_PEER (user_data);
1095 priv = self->priv;
1096
1097 if (priv->is_swarm_leader)
1098 {
1099 /* We signal the change of swarm leadership in on_ping_received(),
1100 * only at that point do we know the unique name of the new leader */
1101 trace_object (self, "Lost swarm leadership");
1102 // FIXME. We ought to remove the Pong match rule, but it's not paramount
1103 priv->is_swarm_leader = FALSE;
1104 }
1105 else
1106 {
1107 trace_object (self, "Did not become leader");
1108 }
1109
1110 /* If this is the first time we are notified that we are not the leader
1111 * then request a roster from the leader */
1112 if (priv->is_first_name_check)
1113 {
1114 trace_object (self, "Requesting peer roster from leader");
1115 if (priv->list_cancellable)
1116 {
1117 g_cancellable_cancel (priv->list_cancellable);
1118 g_object_unref (priv->list_cancellable);
1119 }
1120 priv->list_cancellable = g_cancellable_new ();
1121 g_dbus_connection_call (priv->connection,
1122 priv->swarm_name,
1123 priv->swarm_path,
1124 DEE_PEER_DBUS_IFACE,
1125 "List",
1126 g_variant_new ("()"),
1127 NULL, /* reply type */
1128 G_DBUS_CALL_FLAGS_NONE,
1129 -1,
1130 priv->list_cancellable, /* cancellable */
1131 on_list_received, /* callback */
1132 self); /* user_data */
1133 priv->is_first_name_check = FALSE;
1134 }
1135 }
1136
1137 /* GDBus callback from the name owner installed with g_bus_own_name().
1138 * Called when we become leaders. */
1139 static void
on_leadership_acquired(GDBusConnection * connection,const gchar * name,gpointer user_data)1140 on_leadership_acquired (GDBusConnection *connection,
1141 const gchar *name,
1142 gpointer user_data)
1143 {
1144 g_return_if_fail (DEE_IS_PEER (user_data));
1145
1146 assume_leadership (DEE_PEER (user_data));
1147 }
1148
1149 /* Callback from the GDBus name watcher installed with g_bus_watch_name() */
1150 static void
on_leadership_changed(GDBusConnection * connection,const gchar * name,const gchar * name_owner,gpointer user_data)1151 on_leadership_changed (GDBusConnection *connection,
1152 const gchar *name,
1153 const gchar *name_owner,
1154 gpointer user_data)
1155 {
1156 DeePeer *self;
1157 DeePeerPrivate *priv;
1158
1159 g_return_if_fail (DEE_IS_PEER (user_data));
1160
1161 self = DEE_PEER (user_data);
1162 priv = self->priv;
1163
1164 /* Don't bother if we already know this leader */
1165 if (g_strcmp0 (priv->swarm_leader, name_owner) == 0)
1166 return;
1167
1168 /* At this point we assume we have a new leader */
1169 if (g_strcmp0 (priv->own_name, name_owner) == 0)
1170 assume_leadership (self);
1171 else
1172 {
1173 g_free (priv->swarm_leader);
1174 priv->swarm_leader = g_strdup (name_owner);
1175 priv->is_swarm_leader = FALSE;
1176 g_object_notify (G_OBJECT (self), "swarm-leader");
1177 }
1178 }
1179
1180 /* Callback from gdbus_message_filter() for custom match rules
1181 * Indicates that @peer_address joined the swarm.
1182 * This method is thread safe */
1183 static void
on_join_received(DeePeer * self,const gchar * peer_address)1184 on_join_received (DeePeer *self,
1185 const gchar *peer_address)
1186 {
1187 DeePeerPrivate *priv;
1188
1189 g_return_if_fail (DEE_IS_PEER (self));
1190 g_return_if_fail (peer_address != NULL);
1191
1192 trace_object (self, "Found peer %s", peer_address);
1193 priv = self->priv;
1194
1195 /* If the peer is already known it must have tried to acquire the swarm name
1196 * twice... Just ignore it */
1197 g_mutex_lock (priv->lock);
1198 if (g_hash_table_lookup_extended (priv->peers, peer_address, NULL, NULL))
1199 {
1200 g_mutex_unlock (priv->lock);
1201 return;
1202 }
1203
1204 g_hash_table_insert (priv->peers, g_strdup (peer_address), NULL);
1205 g_mutex_unlock (priv->lock);
1206
1207 emit_peer_found (self, peer_address);
1208 }
1209
1210 /* Callback from _gdbus_message_filter() for custom match rules
1211 * Indicates that @peer_address left the swarm */
1212 static void
on_bye_received(DeePeer * self,const gchar * peer_address)1213 on_bye_received (DeePeer *self,
1214 const gchar *peer_address)
1215 {
1216 DeePeerPrivate *priv;
1217 gboolean removed;
1218
1219 g_return_if_fail (DEE_IS_PEER (self));
1220 g_return_if_fail (peer_address != NULL);
1221
1222 trace_object (self, "Bye %s", peer_address);
1223 priv = self->priv;
1224
1225 g_mutex_lock (priv->lock);
1226 removed = g_hash_table_remove (self->priv->peers, peer_address);
1227 g_mutex_unlock (priv->lock);
1228
1229 if (removed)
1230 {
1231 trace_object (self, "Leader said Bye to %s", peer_address);
1232 g_signal_emit (self, _peer_signals[PEER_LOST], 0, peer_address);
1233 }
1234 else
1235 {
1236 trace_object (self, "Unknown peer '%s' dropped out of the swarm",
1237 peer_address);
1238 }
1239
1240 }
1241
1242 /* Broadcast a Bye signal to to notify the swarm that someone left.
1243 * Only call this method as swarm leader - that's the contract
1244 * of the Swarm spec.
1245 * This method is thread safe */
1246 static void
emit_bye(DeePeer * self,const gchar * peer_address)1247 emit_bye (DeePeer *self,
1248 const gchar *peer_address)
1249 {
1250 DeePeerPrivate *priv;
1251
1252 g_return_if_fail (DEE_IS_PEER (self));
1253 g_return_if_fail (self->priv->is_swarm_leader);
1254 g_return_if_fail (self->priv->connection != NULL);
1255 g_return_if_fail (peer_address != NULL);
1256
1257 trace_object (self, "Emit Bye(%s)", peer_address);
1258
1259 g_signal_emit (self, _peer_signals[PEER_LOST], 0, peer_address);
1260
1261 priv = self->priv;
1262 g_dbus_connection_emit_signal (priv->connection,
1263 NULL, /* destination */
1264 priv->swarm_path, /* object path */
1265 DEE_PEER_DBUS_IFACE, /* interface */
1266 "Bye", /* signal name */
1267 g_variant_new ("(ss)",
1268 priv->swarm_name, peer_address),
1269 NULL); /* error */
1270 }
1271
1272 /* Timeout started when we receive a Ping. When this timeout triggers
1273 * we do a diff of priv->head_count and priv->peers and emit peer-lost
1274 * as appropriate. We don't need to emit peer-found because that is already
1275 * done in when receiving the Pongs from the peers */
1276 static gboolean
on_head_count_complete(DeePeer * self)1277 on_head_count_complete (DeePeer *self)
1278 {
1279 DeePeerPrivate *priv;
1280 GHashTable *new_peers;
1281 gpointer hkey, hval;
1282 GHashTableIter hiter;
1283 GSList *iter;
1284
1285 g_return_val_if_fail (DEE_IS_PEER (self), FALSE);
1286
1287 priv = self->priv;
1288
1289 /* First we build a new_peers hash set with the names of the
1290 * head counted peers. Then we diff the old and the new peers
1291 * sets and emit peer-lost appropriately */
1292 new_peers = g_hash_table_new_full (g_str_hash,
1293 g_str_equal,
1294 (GDestroyNotify) g_free,
1295 NULL);
1296
1297 /* Build new_peers hash set */
1298 iter = priv->head_count;
1299 for (iter = priv->head_count; iter; iter = iter->next)
1300 {
1301 g_hash_table_insert (new_peers, g_strdup (iter->data), NULL);
1302 }
1303
1304 /* Emit peer-lost and Bye on peers that didn't emit Pong in due time */
1305 g_mutex_lock (priv->lock);
1306 g_hash_table_iter_init (&hiter, priv->peers);
1307 while (g_hash_table_iter_next (&hiter, &hkey, &hval))
1308 {
1309 if (!g_hash_table_lookup_extended (new_peers, hkey, NULL, NULL))
1310 {
1311 if (priv->is_swarm_leader)
1312 emit_bye (self, hkey);
1313 else
1314 g_signal_emit (self, _peer_signals[PEER_LOST], 0, hkey);
1315
1316 }
1317 }
1318
1319 /* Swap old and new peers hash sets */
1320 g_hash_table_destroy (priv->peers);
1321 priv->peers = new_peers;
1322 g_mutex_unlock (priv->lock);
1323
1324 /* Unregister the head count timeout source. And reset head counting mode */
1325 priv->head_count_source = 0;
1326 g_slist_foreach (priv->head_count, (GFunc) g_free, NULL);
1327 g_slist_free (priv->head_count);
1328 priv->head_count = NULL;
1329
1330 return FALSE;
1331 }
1332
1333 /* Indicates that @leader_address send a Ping to the swarm to
1334 * initiate a head count */
1335 static void
on_ping_received(DeePeer * self,const gchar * leader_address)1336 on_ping_received (DeePeer *self,
1337 const gchar *leader_address)
1338 {
1339 DeePeerPrivate *priv;
1340
1341 g_return_if_fail (DEE_IS_PEER (self));
1342 g_return_if_fail (leader_address != NULL);
1343
1344 priv = self->priv;
1345
1346 trace_object (self, "Got Ping from: %s", leader_address);
1347
1348 /* When we receive a Ping (and note that the swarm leader will receive its
1349 * own Ping as well) we enter a "head count mode" where we will consider
1350 * all peers that haven't given us a Pong within a certain timeout for lost.
1351 *
1352 * We indicate that we are in head counting mode by setting
1353 * priv->head_count != NULL
1354 */
1355 if (priv->head_count)
1356 {
1357 g_slist_foreach (priv->head_count, (GFunc) g_free, NULL);
1358 g_slist_free (priv->head_count);
1359 }
1360
1361 priv->head_count = g_slist_prepend (NULL, g_strdup (priv->own_name));
1362 if (priv->head_count_source != 0)
1363 {
1364 /* Reset the timer if we got another ping */
1365 g_source_remove (priv->head_count_source);
1366 }
1367 priv->head_count_source = g_timeout_add (500,
1368 (GSourceFunc) on_head_count_complete,
1369 self);
1370
1371 /* The DeePeer protocol says we must respond with a Pong to any Ping */
1372 emit_pong(self);
1373 }
1374
1375 /* Indicates that @peer_address has emitted a Pong */
1376 static void
on_pong_received(DeePeer * self,const gchar * peer_address)1377 on_pong_received (DeePeer *self,
1378 const gchar *peer_address)
1379 {
1380 DeePeerPrivate *priv;
1381
1382 g_return_if_fail (DEE_IS_PEER (self));
1383 g_return_if_fail (peer_address != NULL);
1384
1385 priv = self->priv;
1386 trace_object (self, "Got pong %s", peer_address);
1387
1388 g_mutex_lock (priv->lock);
1389 if (!g_hash_table_lookup_extended (priv->peers, peer_address, NULL, NULL))
1390 {
1391 g_hash_table_insert (priv->peers, g_strdup (peer_address), NULL);
1392
1393 emit_peer_found (self, peer_address);
1394 }
1395 g_mutex_unlock (priv->lock);
1396
1397 /* If we are in head counting mode register this Ping */
1398 if (priv->head_count)
1399 priv->head_count = g_slist_prepend (priv->head_count,
1400 g_strdup (peer_address));
1401 }
1402
1403 static void
on_dbus_peer_signal(GDBusConnection * connection,const gchar * sender_name,const gchar * object_path,const gchar * interface_name,const gchar * signal_name,GVariant * parameters,gpointer user_data)1404 on_dbus_peer_signal (GDBusConnection *connection,
1405 const gchar *sender_name,
1406 const gchar *object_path,
1407 const gchar *interface_name,
1408 const gchar *signal_name,
1409 GVariant *parameters,
1410 gpointer user_data)
1411 {
1412 DeePeer *self;
1413 gchar *peer_address = NULL;
1414
1415 g_return_if_fail (DEE_IS_PEER (user_data));
1416
1417 self = DEE_PEER (user_data);
1418
1419 if (g_strcmp0 ("Bye", signal_name) == 0)
1420 {
1421 g_variant_get (parameters, "(ss)", NULL, &peer_address);
1422 on_bye_received (self, peer_address);
1423 }
1424 else if (g_strcmp0 ("Ping", signal_name) == 0)
1425 on_ping_received (self, sender_name);
1426 else if (g_strcmp0 ("Pong", signal_name) == 0)
1427 on_pong_received (self, sender_name);
1428 else
1429 g_critical ("Unexpected signal from peer %s: %s.%s",
1430 sender_name, interface_name, signal_name);
1431 }
1432
1433 /* Broadcast a Ping signal to do a head-count on the swarm.
1434 * Only call this method as swarm leader - that's the contract
1435 * of the Swarm spec.
1436 * This method is thread safe */
1437 static void
emit_ping(DeePeer * self)1438 emit_ping (DeePeer *self)
1439 {
1440 DeePeerPrivate *priv;
1441
1442 g_return_if_fail (DEE_IS_PEER (self));
1443 g_return_if_fail (self->priv->is_swarm_leader);
1444 g_return_if_fail (self->priv->connection != NULL);
1445
1446 trace_object (self, "Emit ping");
1447
1448 priv = self->priv;
1449 g_dbus_connection_emit_signal (priv->connection,
1450 NULL, /* destination */
1451 priv->swarm_path, /* object path */
1452 DEE_PEER_DBUS_IFACE, /* interface */
1453 "Ping", /* signal name */
1454 g_variant_new ("(s)", priv->swarm_name),
1455 NULL); /* error */
1456 }
1457
1458 /* Broadcast a Pong signal as a response to a Ping.
1459 * This method is thread safe */
1460 static void
emit_pong(DeePeer * self)1461 emit_pong (DeePeer *self)
1462 {
1463 DeePeerPrivate *priv;
1464
1465 g_return_if_fail (DEE_IS_PEER (self));
1466 g_return_if_fail (self->priv->connection != NULL);
1467
1468 trace_object (self, "Emit pong");
1469
1470 priv = self->priv;
1471 g_dbus_connection_emit_signal (priv->connection,
1472 NULL, /* destination */
1473 priv->swarm_path, /* object path */
1474 DEE_PEER_DBUS_IFACE, /* interface */
1475 "Pong", /* signal name */
1476 g_variant_new ("(s)", priv->swarm_name),
1477 NULL); /* error */
1478 }
1479
1480 /* Return floating variant of type '(as)' with unique DBus names of all peers.
1481 * This method is thread safe */
1482 static GVariant*
build_peer_list(DeePeer * self)1483 build_peer_list (DeePeer *self)
1484 {
1485 DeePeerPrivate *priv;
1486 GHashTableIter iter;
1487 GVariantBuilder b;
1488 gpointer key, val;
1489
1490 g_return_val_if_fail (DEE_IS_PEER (self), FALSE);
1491
1492 priv = self->priv;
1493
1494 g_variant_builder_init (&b, G_VARIANT_TYPE ("(as)"));
1495 g_variant_builder_open (&b, G_VARIANT_TYPE ("as"));
1496
1497 g_mutex_lock (priv->lock);
1498 g_hash_table_iter_init (&iter, priv->peers);
1499 while (g_hash_table_iter_next (&iter, &key, &val))
1500 {
1501 g_variant_builder_add (&b, "s", key);
1502 }
1503 g_mutex_unlock (priv->lock);
1504
1505 g_variant_builder_close (&b);
1506 return g_variant_builder_end (&b);
1507 }
1508
1509 /* This method is thread safe */
1510 static gboolean
check_method(GDBusMessage * msg,const gchar * iface,const gchar * member,const gchar * path)1511 check_method (GDBusMessage *msg,
1512 const gchar *iface,
1513 const gchar *member,
1514 const gchar *path)
1515 {
1516 return msg != NULL &&
1517 G_DBUS_MESSAGE_TYPE_METHOD_CALL == g_dbus_message_get_message_type (msg) &&
1518 (iface == NULL || g_strcmp0 (g_dbus_message_get_interface (msg), iface) == 0) &&
1519 (member == NULL || g_strcmp0 (g_dbus_message_get_member (msg), member) == 0) &&
1520 (path == NULL || g_strcmp0 (g_dbus_message_get_path (msg), path) == 0);
1521 }
1522
1523 /* This method is thread safe */
1524 static gboolean
check_signal(GDBusMessage * msg,const gchar * iface,const gchar * member,const gchar * path)1525 check_signal (GDBusMessage *msg,
1526 const gchar *iface,
1527 const gchar *member,
1528 const gchar *path)
1529 {
1530 return msg != NULL &&
1531 G_DBUS_MESSAGE_TYPE_SIGNAL == g_dbus_message_get_message_type (msg) &&
1532 (iface == NULL || g_strcmp0 (g_dbus_message_get_interface (msg), iface) == 0) &&
1533 (member == NULL || g_strcmp0 (g_dbus_message_get_member (msg), member) == 0) &&
1534 (path == NULL || g_strcmp0 (g_dbus_message_get_path (msg), path) == 0);
1535 }
1536
1537 /* Used to transfer data to the mainloop.
1538 * Use only for good, not evil, and only from gdbus_message_filter() */
1539 static gboolean
transfer_to_mainloop(gpointer * args)1540 transfer_to_mainloop (gpointer *args)
1541 {
1542 GPtrArray *ptr_array;
1543 GWeakRef *weak_ref;
1544 GObject *object;
1545 GFunc cb = (GFunc) args[0];
1546
1547 ptr_array = (GPtrArray*) args[1];
1548 weak_ref = (GWeakRef*) g_ptr_array_index (ptr_array, 0);
1549
1550 object = (GObject*) g_weak_ref_get (weak_ref);
1551 if (object != NULL)
1552 {
1553 cb (object, args[2]);
1554 g_object_unref (object);
1555 }
1556
1557 g_ptr_array_unref (ptr_array);
1558 g_free (args[2]);
1559 g_free (args);
1560
1561 return FALSE;
1562 }
1563
1564 /* Callback applied to all incoming DBus messages. We use this to grab
1565 * messages for our match rules and dispatch to the right on_*_received
1566 * function.
1567 * WARNING: This callback is run in the GDBus message handling thread -
1568 * and NOT in the mainloop! */
1569 static GDBusMessage*
gdbus_message_filter(GDBusConnection * connection,GDBusMessage * msg,gboolean incoming,gpointer user_data)1570 gdbus_message_filter (GDBusConnection *connection,
1571 GDBusMessage *msg,
1572 gboolean incoming,
1573 gpointer user_data)
1574 {
1575 DeePeer *self;
1576 DeePeerPrivate *priv;
1577 GVariant *body;
1578 GDBusMessageType msg_type;
1579 const gchar *sender_address;
1580 gpointer *data;
1581 GPtrArray *ptr_array;
1582 GWeakRef *weak_ref;
1583
1584 ptr_array = (GPtrArray*) user_data;
1585 weak_ref = (GWeakRef*) g_ptr_array_index (ptr_array, 0);
1586 body = g_dbus_message_get_body (msg);
1587 sender_address = g_dbus_message_get_sender (msg);
1588 msg_type = g_dbus_message_get_message_type (msg);
1589
1590 /* We have no business with outgoing messages */
1591 if (!incoming)
1592 return msg;
1593
1594 /* We're only interested in method calls and signals */
1595 if (msg_type != G_DBUS_MESSAGE_TYPE_METHOD_CALL &&
1596 msg_type != G_DBUS_MESSAGE_TYPE_SIGNAL)
1597 return msg;
1598
1599 /*trace ("FILTER: %p", user_data);
1600 trace ("Msg filter: From: %s, Iface: %s, Member: %s",
1601 dbus_message_get_sender (msg),
1602 dbus_message_get_interface (msg),
1603 dbus_message_get_member (msg));*/
1604
1605 /* Important note: Apps consuming this lib will likely install custom match
1606 * rules which will trigger this filter. Hence we must do very
1607 * strict matching before we dispatch our methods */
1608
1609 if (check_method (msg, "org.freedesktop.DBus", "RequestName", NULL) &&
1610 g_strcmp0 (sender_address, g_dbus_connection_get_unique_name (connection)) != 0 &&
1611 body != NULL)
1612 {
1613 gchar *swarm_name;
1614
1615 self = (DeePeer*) g_weak_ref_get (weak_ref);
1616 if (self == NULL) return msg;
1617 priv = self->priv;
1618
1619 g_variant_get (body, "(su)", &swarm_name, NULL);
1620 if (g_strcmp0 (swarm_name, priv->swarm_name) == 0)
1621 {
1622 /* Call on_join_received() in the main loop */
1623 data = g_new (gpointer, 3);
1624 data[0] = on_join_received;
1625 data[1] = g_ptr_array_ref (ptr_array);
1626 data[2] = g_strdup (sender_address);
1627 g_idle_add ((GSourceFunc) transfer_to_mainloop, data);
1628 }
1629
1630 g_object_unref (self);
1631 g_free (swarm_name);
1632 }
1633 else if (check_signal (msg, "org.freedesktop.DBus", "NameOwnerChanged", NULL) && body != NULL)
1634 {
1635 gchar *old_address, *new_address, *peer_address;
1636 gboolean should_emit_bye;
1637
1638 self = (DeePeer*) g_weak_ref_get (weak_ref);
1639 if (self == NULL) return msg;
1640 priv = self->priv;
1641
1642 g_variant_get (body, "(sss)", &peer_address, &old_address, &new_address);
1643
1644 /* Check if a known peer dropped off the bus and emit the Bye signal
1645 * if we are the swarm leaders */
1646 g_mutex_lock (priv->lock);
1647 should_emit_bye = priv->is_swarm_leader &&
1648 g_strcmp0 (peer_address, old_address) == 0 &&
1649 g_strcmp0 (new_address, "") == 0 &&
1650 g_strcmp0 (peer_address, g_dbus_connection_get_unique_name (connection)) != 0 &&
1651 g_hash_table_lookup_extended (priv->peers,
1652 peer_address,
1653 NULL,
1654 NULL);
1655 g_mutex_unlock (priv->lock);
1656
1657 if (should_emit_bye)
1658 {
1659 /* Call emit_bye() in the main loop */
1660 data = g_new (gpointer, 3);
1661 data[0] = emit_bye;
1662 data[1] = g_ptr_array_ref (ptr_array);
1663 data[2] = peer_address; // own
1664 g_idle_add ((GSourceFunc) transfer_to_mainloop, data);
1665 peer_address = NULL;
1666 }
1667 g_object_unref (self);
1668 g_free (old_address);
1669 g_free (new_address);
1670 g_free (peer_address);
1671 }
1672 else
1673 {
1674 self = (DeePeer*) g_weak_ref_get (weak_ref);
1675 if (self == NULL) return msg;
1676 priv = self->priv;
1677
1678 if (check_method (msg, DEE_PEER_DBUS_IFACE, "List", priv->swarm_path))
1679 {
1680 /* We don't want to go through the whole GDBus
1681 * interface/introspection setup just to export the List method.
1682 * We just handle this particular method inline */
1683 GDBusMessage *reply;
1684 reply = g_dbus_message_new_method_reply (msg);
1685 g_dbus_message_set_body (reply, build_peer_list (self));
1686 g_dbus_connection_send_message (connection,
1687 reply,
1688 G_DBUS_SEND_MESSAGE_FLAGS_NONE,
1689 NULL, /* out serial */
1690 NULL); /* error */
1691 g_object_unref (reply);
1692
1693 g_object_unref (self);
1694 /* Convince GDBus that we handled this message by returning NULL */
1695 return NULL;
1696 }
1697 g_object_unref (self);
1698 }
1699
1700 return msg;
1701 }
1702