1 /*
2  * gibber-r-multicast-sender.c - Source for GibberRMulticastSender
3  * Copyright (C) 2006-2007 Collabora Ltd.
4  * @author Sjoerd Simons <sjoerd@luon.net>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19  */
20 
21 
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <string.h>
25 
26 #include "gibber-r-multicast-sender.h"
27 #include "gibber-util.h"
28 #include "gibber-signals-marshal.h"
29 
30 #define DEBUG_FLAG DEBUG_RMULTICAST_SENDER
31 #include "gibber-debug.h"
32 
33 #define DEBUG_SENDER(sender, format, ...) \
34   DEBUG("%s %x: " format, sender->name, sender->id, ##__VA_ARGS__)
35 
36 #define PACKET_CACHE_SIZE 256
37 
38 #define MIN_DO_REPAIR_TIMEOUT 50
39 #define MAX_DO_REPAIR_TIMEOUT 100
40 
41 #define MIN_INITIAL_REPAIR_TIMEOUT 150
42 #define MAX_INITIAL_REPAIR_TIMEOUT 250
43 
44 #define MIN_REPAIR_TIMEOUT 500
45 #define MAX_REPAIR_TIMEOUT 800
46 
47 #define MIN_FIRST_WHOIS_TIMEOUT 50
48 #define MAX_FIRST_WHOIS_TIMEOUT 200
49 
50 #define MIN_WHOIS_TIMEOUT 400
51 #define MAX_WHOIS_TIMEOUT 600
52 
53 #define MIN_WHOIS_REPLY_TIMEOUT 50
54 #define MAX_WHOIS_REPLY_TIMEOUT 200
55 
56 /* At least one packet must be popped every 5 minutes.. Reliable keepalives
57  * are send out every three minutes.. */
58 #define MAX_PROGRESS_TIMEOUT 300000
59 
60 /* The senders name should be discovered within about 10 seconds or else it is
61  * fauly */
62 #define NAME_DISCOVERY_TIME  10000
63 
64 #define GIBBER_R_MULTICAST_SENDER_GET_PRIVATE(o)  \
65   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GIBBER_TYPE_R_MULTICAST_SENDER, \
66     GibberRMulticastSenderPrivate))
67 
68 /* private structure */
69 typedef struct _GibberRMulticastSenderPrivate GibberRMulticastSenderPrivate;
70 
71 static void set_state (GibberRMulticastSender *sender,
72    GibberRMulticastSenderState state);
73 
74 struct _GibberRMulticastSenderPrivate
75 {
76   gboolean dispose_has_run;
77   /* hash table with packets */
78   GHashTable *packet_cache;
79 
80   /* Table with acks per sender
81    * guint32 * => owned AckInfo * */
82   GHashTable *acks;
83 
84   /* Sendergroup to which we belong */
85   GibberRMulticastSenderGroup *group;
86 
87   /* Very first packet number in the current window */
88   guint32 first_packet;
89 
90   /* whois reply/request timer */
91   guint whois_timer;
92 
93   /* timer untill which a failure even occurs  */
94   guint fail_timer;
95 
96   /* Whether we are holding back data currently */
97   gboolean holding_data;
98   guint32 holding_point;
99 
100   /* Whether we went know the data starting point or not */
101   gboolean start_data;
102   guint32 start_point;
103 
104   /* Endpoint is just there in case we are in failure mode */
105   guint32 end_point;
106 };
107 
108 typedef struct {
109   guint32 sender_id;
110   guint32 packet_id;
111   /* First packet that had this ack */
112   guint32 first_packet_id;
113 } AckInfo;
114 
115 static void
ack_info_free(gpointer data)116 ack_info_free (gpointer data)
117 {
118   g_slice_free (AckInfo, data);
119 }
120 
121 static AckInfo *
ack_info_new(guint32 sender_id)122 ack_info_new (guint32 sender_id)
123 {
124   AckInfo *result;
125   result = g_slice_new0 (AckInfo);
126   result->sender_id = sender_id;
127   return result;
128 }
129 
130 
131 struct _group_ht_data {
132   GibberRMulticastSenderGroup *group;
133   GibberRMulticastSender *target;
134   GibberRMulticastSender *sender;
135 };
136 
137 static AckInfo *
138 gibber_r_multicast_sender_get_ackinfo (GibberRMulticastSender *sender,
139     guint32 sender_id);
140 
141 GibberRMulticastSenderGroup *
gibber_r_multicast_sender_group_new(void)142 gibber_r_multicast_sender_group_new (void)
143 {
144   GibberRMulticastSenderGroup *result;
145   result = g_slice_new0 (GibberRMulticastSenderGroup);
146 
147   result->senders = g_hash_table_new_full (g_direct_hash, g_direct_equal,
148       NULL, g_object_unref);
149   result->pop_queue = g_queue_new ();
150   result->pending_removal = g_ptr_array_new ();
151   return result;
152 }
153 
154 void
gibber_r_multicast_sender_group_free(GibberRMulticastSenderGroup * group)155 gibber_r_multicast_sender_group_free (GibberRMulticastSenderGroup *group)
156 {
157   GHashTable *h;
158   guint i;
159 
160   g_assert (group->popping == FALSE);
161 
162   h = group->senders;
163   group->senders = NULL;
164   g_hash_table_unref (h);
165 
166   for (i = 0; i < group->pending_removal->len ; i++)
167     {
168       g_object_unref (G_OBJECT (
169         g_ptr_array_index (group->pending_removal, i)));
170     }
171 
172   g_ptr_array_unref (group->pending_removal);
173 
174   g_queue_free (group->pop_queue);
175   g_slice_free (GibberRMulticastSenderGroup, group);
176 }
177 
178 static void
stop_sender(gpointer key,gpointer value,gpointer user_data)179 stop_sender (gpointer key, gpointer value, gpointer user_data)
180 {
181   GibberRMulticastSender *sender = GIBBER_R_MULTICAST_SENDER(value);
182 
183   gibber_r_multicast_sender_stop (sender);
184 }
185 
186 void
gibber_r_multicast_sender_group_stop(GibberRMulticastSenderGroup * group)187 gibber_r_multicast_sender_group_stop (GibberRMulticastSenderGroup *group)
188 {
189   g_hash_table_foreach (group->senders, stop_sender, NULL);
190   group->stopped = TRUE;
191 }
192 
193 void
gibber_r_multicast_sender_group_add(GibberRMulticastSenderGroup * group,GibberRMulticastSender * sender)194 gibber_r_multicast_sender_group_add (GibberRMulticastSenderGroup *group,
195     GibberRMulticastSender *sender)
196 {
197   DEBUG ("Adding %x to sender group", sender->id);
198   g_hash_table_insert (group->senders, GUINT_TO_POINTER (sender->id), sender);
199 }
200 
201 
202 GibberRMulticastSender *
gibber_r_multicast_sender_group_lookup(GibberRMulticastSenderGroup * group,guint32 sender_id)203 gibber_r_multicast_sender_group_lookup (GibberRMulticastSenderGroup *group,
204     guint32 sender_id)
205 {
206   return g_hash_table_lookup (group->senders, GUINT_TO_POINTER (sender_id));
207 }
208 
209 static gboolean
find_by_name(gpointer key,gpointer value,gpointer user_data)210 find_by_name (gpointer key, gpointer value, gpointer user_data)
211 {
212   GibberRMulticastSender *sender = GIBBER_R_MULTICAST_SENDER (value);
213   const gchar *name = (gchar *) user_data;
214 
215   if (sender->state == GIBBER_R_MULTICAST_SENDER_STATE_PENDING_REMOVAL)
216     return FALSE;
217 
218   return !gibber_strdiff (sender->name, name);
219 }
220 
221 GibberRMulticastSender *
gibber_r_multicast_sender_group_lookup_by_name(GibberRMulticastSenderGroup * group,const gchar * name)222 gibber_r_multicast_sender_group_lookup_by_name (
223     GibberRMulticastSenderGroup *group, const gchar *name)
224 {
225   return g_hash_table_find (group->senders, find_by_name, (gpointer) name);
226 }
227 
228 static void
cleanup_acks(gpointer key,gpointer value,gpointer user_data)229 cleanup_acks (gpointer key, gpointer value, gpointer user_data)
230 {
231   GibberRMulticastSender *sender = GIBBER_R_MULTICAST_SENDER (value);
232   GibberRMulticastSenderPrivate *priv =
233      GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
234 
235   g_hash_table_remove (priv->acks, (guint32 *) user_data);
236 }
237 
238 static void
gibber_r_multicast_sender_group_gc_acks(GibberRMulticastSenderGroup * group,guint32 sender_id)239 gibber_r_multicast_sender_group_gc_acks (GibberRMulticastSenderGroup *group,
240     guint32 sender_id)
241 {
242   guint i;
243   GibberRMulticastSender *s;
244   /* If there are no more senders for sender_id in the list, clean up the acks
245    */
246   if (g_hash_table_lookup (group->senders, GUINT_TO_POINTER (sender_id))
247       != NULL)
248     return;
249 
250   for (i = 0; i < group->pending_removal->len ; i++)
251     {
252       s = GIBBER_R_MULTICAST_SENDER (
253           g_ptr_array_index (group->pending_removal, i));
254       if (s->id == sender_id)
255         return;
256     }
257 
258   g_hash_table_foreach (group->senders, cleanup_acks, &sender_id);
259 }
260 
261 void
gibber_r_multicast_sender_group_remove(GibberRMulticastSenderGroup * group,guint32 sender_id)262 gibber_r_multicast_sender_group_remove (GibberRMulticastSenderGroup *group,
263     guint32 sender_id)
264 {
265   GibberRMulticastSender *s;
266 
267   DEBUG ("Removing %x from sender group", sender_id);
268 
269   s = g_hash_table_lookup (group->senders, GUINT_TO_POINTER(sender_id));
270 
271   if (s == NULL)
272     {
273       DEBUG ("Can't remove unknown sender id: %x", sender_id);
274       return;
275     }
276 
277   g_queue_remove (group->pop_queue, s);
278   set_state (s, GIBBER_R_MULTICAST_SENDER_STATE_PENDING_REMOVAL);
279 
280   if (gibber_r_multicast_sender_packet_cache_size (s) > 0)
281     {
282       DEBUG ("Keeping %x in cache, %d items left", sender_id,
283           gibber_r_multicast_sender_packet_cache_size (s));
284       gibber_r_multicast_sender_stop (s);
285       g_hash_table_steal (group->senders, GUINT_TO_POINTER (sender_id));
286       g_ptr_array_add (group->pending_removal, s);
287     }
288   else
289    {
290      g_hash_table_remove (group->senders, GUINT_TO_POINTER(sender_id));
291      gibber_r_multicast_sender_group_gc_acks (group, sender_id);
292    }
293 
294 }
295 
296 static void
create_sender_array(gpointer key,gpointer value,gpointer user_data)297 create_sender_array (gpointer key, gpointer value, gpointer user_data)
298 {
299   GibberRMulticastSender *sender = GIBBER_R_MULTICAST_SENDER (value);
300   GArray *array = (GArray *) user_data;
301   AckInfo info;
302 
303   if (sender->state >= GIBBER_R_MULTICAST_SENDER_STATE_STOPPED)
304     return;
305 
306   info.sender_id =  sender->id;
307   info.packet_id = sender->next_input_packet;
308 
309   g_array_append_val (array, info);
310 }
311 
312 static void
update_sender_acks(gpointer key,gpointer value,gpointer user_data)313 update_sender_acks (gpointer key, gpointer value, gpointer user_data)
314 {
315   GibberRMulticastSender *sender = GIBBER_R_MULTICAST_SENDER (value);
316   GArray *array = (GArray *) user_data;
317   guint i;
318 
319   if (sender->state >= GIBBER_R_MULTICAST_SENDER_STATE_STOPPED)
320     return;
321 
322   for (i = 0; i < array->len ; i++)
323     {
324       AckInfo *info = &g_array_index (array, AckInfo, i);
325       AckInfo *ack;
326 
327       if (sender->id == info->sender_id)
328         continue;
329 
330       if ((ack = gibber_r_multicast_sender_get_ackinfo (sender,
331            info->sender_id)) != NULL)
332         {
333           if (gibber_r_multicast_packet_diff (ack->packet_id,
334                info->packet_id) > 0)
335             info->packet_id = ack->packet_id;
336         }
337       else
338        {
339          g_array_remove_index_fast (array, i);
340          /* The last element is now placed at location i, so retry i */
341          i--;
342          continue;
343        }
344     }
345 }
346 
347 static AckInfo *
get_direct_ack(GibberRMulticastSender * sender,GibberRMulticastSender * target)348 get_direct_ack (GibberRMulticastSender *sender, GibberRMulticastSender *target)
349 {
350   AckInfo *ack;
351 
352   ack = gibber_r_multicast_sender_get_ackinfo (sender, target->id);
353 
354   if (G_LIKELY (ack != NULL))
355    {
356      /* Returning the direct ack if there is one */
357      if (G_LIKELY (gibber_r_multicast_packet_diff (
358            target->next_output_packet, ack->packet_id) >= 0))
359        return ack;
360    }
361 
362   return NULL;
363 }
364 
365 static gboolean
find_indirect_ack(gpointer key,gpointer value,gpointer user_data)366 find_indirect_ack (gpointer key, gpointer value, gpointer user_data)
367 {
368   struct _group_ht_data *hd = (struct _group_ht_data *) user_data;
369   GibberRMulticastSender *sender = GIBBER_R_MULTICAST_SENDER (value);
370   AckInfo *target_ack, *ack;
371 
372   if (sender == hd->sender)
373     return FALSE;
374 
375   target_ack = get_direct_ack (sender, hd->target);
376 
377   if (target_ack == NULL)
378     return FALSE;
379 
380   ack = gibber_r_multicast_sender_get_ackinfo (hd->sender, sender->id);
381   if (ack == NULL)
382     return FALSE;
383 
384   return gibber_r_multicast_packet_diff (target_ack->first_packet_id,
385       ack->packet_id) > 0;
386 }
387 
388 static gboolean
failure_not_acked(gpointer key,gpointer value,gpointer user_data)389 failure_not_acked (gpointer key, gpointer value, gpointer user_data)
390 {
391   GibberRMulticastSender *sender = GIBBER_R_MULTICAST_SENDER (value);
392   struct _group_ht_data *hd = (struct _group_ht_data *) user_data;
393 
394   if (sender->state == GIBBER_R_MULTICAST_SENDER_STATE_PENDING_REMOVAL)
395     return FALSE;
396 
397   /* A failure is acked iff each sender has acked it's last packet (direct ack)
398    * or a sender acked a packet of another sender acking the failures last
399    * packet (indirect ack) or if the sender never has even heard of this node.
400    * */
401 
402   if (get_direct_ack (sender, hd->target) != NULL)
403     return FALSE;
404 
405   hd->sender = sender;
406 
407   return g_hash_table_find (hd->group->senders, find_indirect_ack, hd) == NULL;
408 }
409 
410 static gboolean
can_gc_sender(GibberRMulticastSenderGroup * group,GibberRMulticastSender * sender)411 can_gc_sender (GibberRMulticastSenderGroup *group,
412     GibberRMulticastSender *sender)
413 {
414   struct _group_ht_data hd;
415   GibberRMulticastSender *ret;
416 
417   hd.group = group;
418   hd.target = sender;
419 
420   ret = g_hash_table_find (group->senders, failure_not_acked, &hd);
421 
422   if (ret == NULL)
423     DEBUG_SENDER (sender, "Removed by GC");
424   else
425     DEBUG_SENDER (sender, "Not removed by GC because of %s (%x)",
426       ret->name, ret->id);
427 
428   return ret == NULL;
429 }
430 
431 static void
gibber_r_multicast_sender_group_gc(GibberRMulticastSenderGroup * group)432 gibber_r_multicast_sender_group_gc (GibberRMulticastSenderGroup *group)
433 {
434   GArray *array;
435   guint i;
436 
437   array = g_array_sized_new (FALSE, TRUE, sizeof (AckInfo),
438       g_hash_table_size (group->senders));
439 
440   g_hash_table_foreach (group->senders, create_sender_array, array);
441   g_hash_table_foreach (group->senders, update_sender_acks, array);
442 
443   for (i = 0; i < array->len ; i++)
444     {
445       AckInfo *info = &g_array_index (array, AckInfo, i);
446       GibberRMulticastSender *sender = g_hash_table_lookup (group->senders,
447         GUINT_TO_POINTER (info->sender_id));
448 
449       gibber_r_multicast_sender_ack (sender, info->packet_id);
450     }
451 
452   g_array_unref (array);
453 
454   /* Check if we can remove pending removals */
455   for (i = 0; i < group->pending_removal->len ; i++)
456     {
457       GibberRMulticastSender *s;
458 
459       s = GIBBER_R_MULTICAST_SENDER (g_ptr_array_index (group->pending_removal,
460           i));
461       if (can_gc_sender (group, s))
462         {
463           g_ptr_array_remove_index_fast (group->pending_removal, i);
464           gibber_r_multicast_sender_group_gc_acks (group, s->id);
465           g_object_unref (s);
466           /* Last entry has replaced i, so force a retry of i */
467           i--;
468         }
469     }
470 }
471 
472 gboolean
gibber_r_multicast_sender_group_push_packet(GibberRMulticastSenderGroup * group,GibberRMulticastPacket * packet)473 gibber_r_multicast_sender_group_push_packet (
474     GibberRMulticastSenderGroup *group, GibberRMulticastPacket *packet)
475 {
476   gboolean handled = FALSE;
477   GibberRMulticastSender *sender;
478   guint i;
479 
480   if (packet->type == PACKET_TYPE_WHOIS_REQUEST)
481     sender = gibber_r_multicast_sender_group_lookup (group,
482         packet->data.whois_request.sender_id);
483   else
484     sender = gibber_r_multicast_sender_group_lookup (group,
485         packet->sender);
486 
487   switch (packet->type)
488     {
489       case PACKET_TYPE_WHOIS_REQUEST:
490         /* Pending removal nodes still reply to WHOIS_REQUEST to prevent new
491          * nodes from taking the same id */
492         if (sender == NULL)
493           {
494             GibberRMulticastSender *s;
495             for (i = 0; i < group->pending_removal->len ; i++)
496               {
497                 s = GIBBER_R_MULTICAST_SENDER (
498                    g_ptr_array_index (group->pending_removal, i));
499                 if (s->id == packet->data.whois_request.sender_id)
500                   {
501                     sender = s;
502                     break;
503                   }
504               }
505           }
506         /* fallthrough */
507       case PACKET_TYPE_WHOIS_REPLY:
508         if (sender != NULL)
509           {
510             gibber_r_multicast_sender_whois_push (sender, packet);
511             handled = TRUE;
512           }
513         break;
514     case PACKET_TYPE_REPAIR_REQUEST:
515         {
516           GibberRMulticastSender *rsender;
517           guint32 sender_id = packet->data.repair_request.sender_id;
518           guint32 packet_id = packet->data.repair_request.packet_id;
519 
520           rsender = gibber_r_multicast_sender_group_lookup (group, sender_id);
521 
522           g_assert (sender_id != 0);
523 
524           if (rsender != NULL &&
525                 gibber_r_multicast_sender_repair_request (rsender, packet_id))
526             {
527               /* rsender took up the repair request. */
528               handled = TRUE;
529               break;
530             }
531 
532             for (i = 0; i < group->pending_removal->len ; i++)
533               {
534                 rsender = GIBBER_R_MULTICAST_SENDER (
535                    g_ptr_array_index (group->pending_removal, i));
536                 if (rsender->id == sender_id)
537                   {
538                     if (gibber_r_multicast_sender_repair_request (rsender,
539                           packet_id))
540                       {
541                         handled = TRUE;
542                         break;
543                       }
544                   }
545               }
546             DEBUG ("Ignoring repair request for unknown original sender");
547           break;
548         }
549     case PACKET_TYPE_SESSION:
550       /* Session message aren't handled by us. But if we know the sender it's
551        * at least not a foreign sender */
552       if (sender != NULL)
553         handled = TRUE;
554       break;
555     default:
556       if (GIBBER_R_MULTICAST_PACKET_IS_RELIABLE_PACKET (packet))
557         {
558           if (sender != NULL
559                 && sender->state > GIBBER_R_MULTICAST_SENDER_STATE_NEW )
560             {
561               gibber_r_multicast_sender_push (sender, packet);
562               handled = TRUE;
563             }
564           else
565             {
566               for (i = 0; i < group->pending_removal->len ; i++)
567                 {
568                   sender = GIBBER_R_MULTICAST_SENDER (
569                      g_ptr_array_index (group->pending_removal, i));
570                   if (sender->id == packet->sender)
571                     {
572                       /* Say we have handled a reliable packet if there is any
573                        * node to remove with the right sender id.. This means
574                        * we handle packets for a removed sender longer than
575                        * strictly needed, but this doesn't hurt */
576                        handled = TRUE;
577                        break;
578                     }
579                 }
580             }
581         }
582       else
583         {
584           DEBUG ("Received unhandled packet type!!, ignoring");
585         }
586     }
587 
588   return handled;
589 }
590 
591 static void schedule_repair (GibberRMulticastSender *sender, guint32 id);
592 static void schedule_do_repair (GibberRMulticastSender *sender, guint32 id);
593 static void schedule_whois_request (GibberRMulticastSender *sender,
594     gboolean rescheduled);
595 static gboolean name_discovery_failed_cb (gpointer data);
596 static void schedule_progress_timer (GibberRMulticastSender *self);
597 
598 G_DEFINE_TYPE(GibberRMulticastSender, gibber_r_multicast_sender, G_TYPE_OBJECT)
599 
600 /* signal enum */
601 enum
602 {
603     REPAIR_REQUEST,
604     REPAIR_MESSAGE,
605     WHOIS_REPLY,
606     WHOIS_REQUEST,
607     NAME_DISCOVERED,
608     RECEIVED_DATA,
609     RECEIVED_CONTROL_PACKET,
610     FAILED,
611     LAST_SIGNAL
612 };
613 
614 /* properties */
615 enum {
616   PROP_SENDER_GROUP = 1,
617   LAST_PROPERTY
618 };
619 
620 static guint signals[LAST_SIGNAL] = {0};
621 
622 typedef struct {
623   guint32 packet_id;
624   guint timeout;
625   gboolean repeating;
626   GibberRMulticastPacket *packet;
627   GibberRMulticastSender *sender;
628   gboolean acked;
629   gboolean popped;
630 } PacketInfo;
631 
632 static void
packet_info_free(gpointer data)633 packet_info_free (gpointer data)
634 {
635   PacketInfo *p = (PacketInfo *) data;
636   if (p->packet != NULL) {
637     g_object_unref (p->packet);
638   }
639 
640   if (p->timeout != 0) {
641     g_source_remove (p->timeout);
642   }
643   g_slice_free (PacketInfo, data);
644 }
645 
646 static PacketInfo *
packet_info_new(GibberRMulticastSender * sender,guint32 packet_id)647 packet_info_new (GibberRMulticastSender*sender, guint32 packet_id)
648 {
649   PacketInfo *result;
650   result = g_slice_new0 (PacketInfo);
651   result->packet_id = packet_id;
652   result->sender = sender;
653   return result;
654 }
655 
656 static void
gibber_r_multicast_sender_init(GibberRMulticastSender * obj)657 gibber_r_multicast_sender_init (GibberRMulticastSender *obj)
658 {
659   GibberRMulticastSenderPrivate *priv =
660     GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (obj);
661 
662   /* allocate any data required by the object here */
663   priv->packet_cache = g_hash_table_new_full (g_int_hash, g_int_equal,
664       NULL, packet_info_free);
665 
666   priv->acks = g_hash_table_new_full (g_int_hash, g_int_equal,
667       NULL, ack_info_free);
668 }
669 
670 static void gibber_r_multicast_sender_dispose (GObject *object);
671 static void gibber_r_multicast_sender_finalize (GObject *object);
672 
673 static void
gibber_r_multicast_sender_set_property(GObject * object,guint property_id,const GValue * value,GParamSpec * pspec)674 gibber_r_multicast_sender_set_property (GObject *object,
675     guint property_id, const GValue *value, GParamSpec *pspec)
676 {
677   GibberRMulticastSender *sender = GIBBER_R_MULTICAST_SENDER (object);
678   GibberRMulticastSenderPrivate *priv =
679     GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
680 
681   switch (property_id) {
682     case PROP_SENDER_GROUP:
683       priv->group =
684           (GibberRMulticastSenderGroup *) g_value_get_pointer (value);
685       break;
686     default:
687       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
688       break;
689   }
690 }
691 
692 static void
gibber_r_multicast_sender_class_init(GibberRMulticastSenderClass * gibber_r_multicast_sender_class)693 gibber_r_multicast_sender_class_init (
694     GibberRMulticastSenderClass *gibber_r_multicast_sender_class)
695 {
696   GObjectClass *object_class =
697       G_OBJECT_CLASS (gibber_r_multicast_sender_class);
698   GParamSpec *param_spec;
699 
700   g_type_class_add_private (gibber_r_multicast_sender_class,
701       sizeof (GibberRMulticastSenderPrivate));
702 
703   object_class->dispose = gibber_r_multicast_sender_dispose;
704   object_class->finalize = gibber_r_multicast_sender_finalize;
705 
706   object_class->set_property = gibber_r_multicast_sender_set_property;
707 
708   signals[REPAIR_REQUEST] = g_signal_new ("repair-request",
709       G_OBJECT_CLASS_TYPE(gibber_r_multicast_sender_class),
710       G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
711       0,
712       NULL, NULL,
713       g_cclosure_marshal_VOID__UINT,
714       G_TYPE_NONE, 1, G_TYPE_UINT);
715 
716   signals[REPAIR_MESSAGE] = g_signal_new ("repair-message",
717       G_OBJECT_CLASS_TYPE(gibber_r_multicast_sender_class),
718       G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
719       0,
720       NULL, NULL,
721       g_cclosure_marshal_VOID__OBJECT,
722       G_TYPE_NONE, 1, GIBBER_TYPE_R_MULTICAST_PACKET);
723 
724   signals[RECEIVED_DATA] = g_signal_new ("received-data",
725       G_OBJECT_CLASS_TYPE(gibber_r_multicast_sender_class),
726       G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
727       0,
728       NULL, NULL,
729       _gibber_signals_marshal_VOID__UINT_POINTER_ULONG,
730       G_TYPE_NONE, 3, G_TYPE_UINT, G_TYPE_POINTER, G_TYPE_ULONG);
731 
732   signals[RECEIVED_CONTROL_PACKET] = g_signal_new ("received-control-packet",
733       G_OBJECT_CLASS_TYPE(gibber_r_multicast_sender_class),
734       G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
735       0,
736       NULL, NULL,
737       g_cclosure_marshal_VOID__OBJECT,
738       G_TYPE_NONE, 1, GIBBER_TYPE_R_MULTICAST_PACKET);
739 
740   signals[WHOIS_REPLY] = g_signal_new ("whois-reply",
741        G_OBJECT_CLASS_TYPE(gibber_r_multicast_sender_class),
742        G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
743        0,
744        NULL, NULL,
745        g_cclosure_marshal_VOID__VOID,
746        G_TYPE_NONE, 0);
747 
748   signals[WHOIS_REQUEST] = g_signal_new ("whois-request",
749       G_OBJECT_CLASS_TYPE(gibber_r_multicast_sender_class),
750       G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
751       0,
752       NULL, NULL,
753       g_cclosure_marshal_VOID__VOID,
754       G_TYPE_NONE, 0);
755 
756   signals[FAILED] = g_signal_new ("failed",
757       G_OBJECT_CLASS_TYPE(gibber_r_multicast_sender_class),
758       G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
759       0,
760       NULL, NULL,
761       g_cclosure_marshal_VOID__VOID,
762       G_TYPE_NONE, 0);
763 
764   signals[NAME_DISCOVERED] = g_signal_new ("name-discovered",
765       G_OBJECT_CLASS_TYPE(gibber_r_multicast_sender_class),
766       G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
767       0,
768       NULL, NULL,
769       g_cclosure_marshal_VOID__STRING,
770       G_TYPE_NONE, 1, G_TYPE_STRING);
771 
772   param_spec = g_param_spec_pointer ("sendergroup",
773       "Sender Group",
774       "Group of senders",
775       G_PARAM_CONSTRUCT_ONLY |
776       G_PARAM_WRITABLE       | G_PARAM_STATIC_STRINGS);
777 
778   g_object_class_install_property (object_class, PROP_SENDER_GROUP,
779       param_spec);
780 }
781 
782 void
gibber_r_multicast_sender_dispose(GObject * object)783 gibber_r_multicast_sender_dispose (GObject *object)
784 {
785   GibberRMulticastSender *self = GIBBER_R_MULTICAST_SENDER (object);
786   GibberRMulticastSenderPrivate *priv =
787      GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (self);
788 
789   DEBUG_SENDER (self, "disposing");
790 
791   if (priv->dispose_has_run)
792     return;
793 
794   priv->dispose_has_run = TRUE;
795 
796   g_hash_table_unref (priv->packet_cache);
797   g_hash_table_unref (priv->acks);
798 
799   if (priv->whois_timer != 0)
800     {
801       g_source_remove (priv->whois_timer);
802       priv->whois_timer = 0;
803     }
804 
805   if (priv->fail_timer != 0)
806     {
807       g_source_remove (priv->fail_timer);
808       priv->fail_timer = 0;
809     }
810 
811   if (G_OBJECT_CLASS (gibber_r_multicast_sender_parent_class)->dispose)
812     G_OBJECT_CLASS (gibber_r_multicast_sender_parent_class)->dispose (object);
813 }
814 
815 void
gibber_r_multicast_sender_finalize(GObject * object)816 gibber_r_multicast_sender_finalize (GObject *object)
817 {
818   GibberRMulticastSender *self = GIBBER_R_MULTICAST_SENDER (object);
819   /*GibberRMulticastSenderPrivate *priv =
820      GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (self);
821      */
822 
823   /* free any data held directly by the object here */
824   g_free (self->name);
825 
826   G_OBJECT_CLASS (gibber_r_multicast_sender_parent_class)->finalize (object);
827 }
828 
829 static void
set_state(GibberRMulticastSender * sender,GibberRMulticastSenderState state)830 set_state (GibberRMulticastSender *sender,
831    GibberRMulticastSenderState state)
832 {
833   g_assert (sender->state <= state);
834 
835   sender->state = state;
836 }
837 
838 GibberRMulticastSender *
gibber_r_multicast_sender_new(guint32 id,const gchar * name,GibberRMulticastSenderGroup * group)839 gibber_r_multicast_sender_new (guint32 id, const gchar *name,
840     GibberRMulticastSenderGroup *group)
841 {
842   GibberRMulticastSender *sender;
843   GibberRMulticastSenderPrivate *priv;
844 
845   sender = g_object_new (GIBBER_TYPE_R_MULTICAST_SENDER, "sendergroup", group,
846         NULL);
847   priv = GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
848 
849   g_assert (group != NULL);
850 
851   sender->id = id;
852   sender->name = g_strdup (name);
853 
854   if (sender->name == NULL)
855     {
856       schedule_whois_request (sender, FALSE);
857       priv->fail_timer = g_timeout_add (NAME_DISCOVERY_TIME,
858         name_discovery_failed_cb, sender);
859     }
860   else
861    {
862      schedule_progress_timer (sender);
863    }
864 
865   return sender;
866 }
867 
868 static void
packet_info_try_gc(GibberRMulticastSender * sender,PacketInfo * info)869 packet_info_try_gc (GibberRMulticastSender *sender, PacketInfo *info)
870 {
871   GibberRMulticastSenderPrivate *priv =
872       GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
873   guint32 packet_id, i;
874 
875   if (!info->acked || !info->popped || info->repeating)
876     return;
877 
878   packet_id = info->packet_id;
879   g_hash_table_remove (priv->packet_cache, &packet_id);
880 
881   if (packet_id == priv->first_packet)
882     {
883       for (i = packet_id; i != sender->next_output_data_packet; i++)
884         if (g_hash_table_lookup (priv->packet_cache, &i) != NULL)
885           break;
886 
887       priv->first_packet = i;
888     }
889 }
890 
891 static void
cancel_failure_timers(GibberRMulticastSender * sender)892 cancel_failure_timers (GibberRMulticastSender *sender)
893 {
894   GibberRMulticastSenderPrivate *priv =
895       GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
896 
897   /* Cancel timers that are not needed anymore now the sender has failed */
898   if (priv->fail_timer != 0)
899     {
900       g_source_remove (priv->fail_timer);
901       priv->fail_timer = 0;
902     }
903 
904   /* failed, no need to get our name anymore */
905   if (priv->whois_timer != 0)
906     {
907       g_source_remove (priv->whois_timer);
908       priv->whois_timer = 0;
909     }
910 }
911 
912 static void
signal_data(GibberRMulticastSender * sender,guint16 stream_id,guint8 * data,gsize size)913 signal_data (GibberRMulticastSender *sender, guint16 stream_id,
914     guint8 *data, gsize size)
915 {
916   set_state (sender,
917     MAX(GIBBER_R_MULTICAST_SENDER_STATE_DATA_RUNNING, sender->state));
918 
919   g_signal_emit (sender, signals[RECEIVED_DATA], 0, stream_id, data, size);
920 }
921 
922 static void
signal_control_packet(GibberRMulticastSender * sender,GibberRMulticastPacket * packet)923 signal_control_packet (GibberRMulticastSender *sender,
924     GibberRMulticastPacket *packet)
925 {
926   set_state (sender,
927     MAX (GIBBER_R_MULTICAST_SENDER_STATE_RUNNING, sender->state));
928 
929   g_signal_emit (sender, signals[RECEIVED_CONTROL_PACKET], 0, packet);
930 }
931 
932 static void
signal_failure(GibberRMulticastSender * sender)933 signal_failure (GibberRMulticastSender *sender)
934 {
935   if (sender->state >= GIBBER_R_MULTICAST_SENDER_STATE_FAILED)
936     return;
937 
938   DEBUG_SENDER (sender, "Signalling senders failure");
939   cancel_failure_timers (sender);
940   g_signal_emit (sender, signals[FAILED], 0);
941 }
942 
943 static gboolean
name_discovery_failed_cb(gpointer data)944 name_discovery_failed_cb (gpointer data)
945 {
946   GibberRMulticastSender *self = GIBBER_R_MULTICAST_SENDER (data);
947   GibberRMulticastSenderPrivate *priv =
948     GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (self);
949 
950   DEBUG_SENDER (self, "Failed to discover name in time");
951 
952   g_assert (priv->whois_timer != 0);
953 
954   g_source_remove (priv->whois_timer);
955   priv->whois_timer = 0;
956   priv->fail_timer = 0;
957 
958   signal_failure (self);
959 
960   return FALSE;
961 }
962 
963 static gboolean
progress_failed_cb(gpointer data)964 progress_failed_cb (gpointer data)
965 {
966   GibberRMulticastSender *self = GIBBER_R_MULTICAST_SENDER (data);
967   GibberRMulticastSenderPrivate *priv =
968     GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (self);
969 
970   DEBUG_SENDER (self, "Failed to make progress in time");
971 
972   priv->fail_timer = 0;
973 
974   signal_failure (self);
975 
976   return FALSE;
977 }
978 
979 static void
schedule_progress_timer(GibberRMulticastSender * self)980 schedule_progress_timer (GibberRMulticastSender *self)
981 {
982   GibberRMulticastSenderPrivate *priv =
983     GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (self);
984 
985   /* If we didn't discover the name, that timer is still running */
986   if (self->name == NULL)
987     return;
988 
989   /* No need for a watchdog if it has failed already */
990   if (self->state >= GIBBER_R_MULTICAST_SENDER_STATE_FAILED)
991     return;
992 
993   if (priv->fail_timer != 0)
994     g_source_remove (priv->fail_timer);
995 
996   priv->fail_timer = g_timeout_add (MAX_PROGRESS_TIMEOUT,
997       progress_failed_cb, self);
998 }
999 
1000 static void
stop_whois_discovery(GibberRMulticastSender * self)1001 stop_whois_discovery (GibberRMulticastSender *self)
1002 {
1003   GibberRMulticastSenderPrivate *priv =
1004     GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (self);
1005 
1006   if (priv->whois_timer != 0)
1007     {
1008       g_source_remove (priv->whois_timer);
1009       priv->whois_timer = 0;
1010     }
1011 
1012   if (priv->fail_timer != 0)
1013     {
1014       g_source_remove (priv->fail_timer);
1015       priv->fail_timer = 0;
1016     }
1017 
1018 }
1019 
1020 static void
name_discovered(GibberRMulticastSender * self,const gchar * name)1021 name_discovered (GibberRMulticastSender *self, const gchar *name)
1022 {
1023   stop_whois_discovery (self);
1024 
1025   self->name = g_strdup (name);
1026   DEBUG_SENDER (self, "Name discovered");
1027   g_signal_emit (self, signals[NAME_DISCOVERED], 0, self->name);
1028 
1029   schedule_progress_timer (self);
1030 }
1031 
1032 static gboolean
request_repair(gpointer data)1033 request_repair (gpointer data)
1034 {
1035   PacketInfo *info = (PacketInfo *) data;
1036 
1037   DEBUG_SENDER (info->sender, "Sending out repair request for 0x%x",
1038     info->packet_id);
1039 
1040   info->timeout = 0;
1041   g_signal_emit (info->sender, signals[REPAIR_REQUEST], 0, info->packet_id);
1042   schedule_repair (info->sender, info->packet_id);
1043 
1044   return FALSE;
1045 }
1046 
1047 
1048 static void
schedule_repair(GibberRMulticastSender * sender,guint32 id)1049 schedule_repair (GibberRMulticastSender *sender, guint32 id)
1050 {
1051   GibberRMulticastSenderPrivate *priv =
1052       GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1053   PacketInfo *info;
1054   guint timeout;
1055 
1056   if (sender->state > GIBBER_R_MULTICAST_SENDER_STATE_STOPPED)
1057     return;
1058 
1059   info = g_hash_table_lookup (priv->packet_cache, &id);
1060 
1061   if (info != NULL && (info->packet != NULL || info->timeout != 0)) {
1062     return;
1063   }
1064 
1065   if (info == NULL)
1066     {
1067       info = packet_info_new (sender, id);
1068       g_hash_table_insert (priv->packet_cache, &info->packet_id, info);
1069       timeout = g_random_int_range (MIN_INITIAL_REPAIR_TIMEOUT,
1070           MAX_INITIAL_REPAIR_TIMEOUT);
1071     }
1072   else
1073     {
1074       timeout = g_random_int_range (MIN_REPAIR_TIMEOUT, MAX_REPAIR_TIMEOUT);
1075     }
1076 
1077   info->timeout = g_timeout_add (timeout, request_repair, info);
1078   DEBUG_SENDER (sender,
1079     "Scheduled repair request for 0x%x in %d ms", id, timeout);
1080 }
1081 
1082 static gboolean
do_repair(gpointer data)1083 do_repair (gpointer data)
1084 {
1085   PacketInfo *info = (PacketInfo *) data;
1086 
1087   g_assert (info != NULL && info->packet != NULL);
1088 
1089   DEBUG_SENDER (info->sender, "Sending Repair message for 0x%x",
1090     info->packet_id);
1091 
1092   info->timeout = 0;
1093   g_signal_emit (info->sender, signals[REPAIR_MESSAGE], 0, info->packet);
1094 
1095   if (info->repeating)
1096     {
1097       schedule_do_repair (info->sender, info->packet_id);
1098     }
1099 
1100   return FALSE;
1101 }
1102 
1103 static void
schedule_do_repair(GibberRMulticastSender * sender,guint32 id)1104 schedule_do_repair (GibberRMulticastSender *sender, guint32 id)
1105 {
1106   GibberRMulticastSenderPrivate *priv =
1107       GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1108   PacketInfo *info;
1109   guint timeout;
1110 
1111   info = g_hash_table_lookup (priv->packet_cache, &id);
1112 
1113   g_assert (info != NULL && info->packet != NULL);
1114   if (info->timeout != 0)
1115     {
1116       /* Repair already scheduled, ignore */
1117       return;
1118     }
1119 
1120   timeout = g_random_int_range (MIN_DO_REPAIR_TIMEOUT, MAX_DO_REPAIR_TIMEOUT);
1121   info->timeout = g_timeout_add (timeout, do_repair, info);
1122   DEBUG_SENDER (sender, "Scheduled repair for 0x%x in %d ms", id, timeout);
1123 }
1124 
1125 static gboolean
do_whois_reply(gpointer data)1126 do_whois_reply (gpointer data)
1127 {
1128   GibberRMulticastSender *sender = GIBBER_R_MULTICAST_SENDER (data);
1129   GibberRMulticastSenderPrivate *priv =
1130       GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1131 
1132   DEBUG_SENDER (sender, "Sending out whois reply");
1133   g_signal_emit (sender, signals[WHOIS_REPLY], 0);
1134   priv->whois_timer = 0;
1135 
1136   return FALSE;
1137 }
1138 
1139 static gboolean
do_whois_request(gpointer data)1140 do_whois_request (gpointer data)
1141 {
1142   GibberRMulticastSender *sender = GIBBER_R_MULTICAST_SENDER (data);
1143 
1144   schedule_whois_request (sender, TRUE);
1145 
1146   DEBUG_SENDER (sender, "Sending out whois request");
1147   g_signal_emit (sender, signals[WHOIS_REQUEST], 0);
1148 
1149   return FALSE;
1150 }
1151 
1152 static void
schedule_whois_request(GibberRMulticastSender * sender,gboolean rescheduled)1153 schedule_whois_request (GibberRMulticastSender *sender, gboolean rescheduled)
1154 {
1155   GibberRMulticastSenderPrivate *priv =
1156       GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1157    gint timeout;
1158 
1159    if (sender->state >= GIBBER_R_MULTICAST_SENDER_STATE_FAILED)
1160      return;
1161 
1162    if (rescheduled)
1163     timeout = g_random_int_range (MIN_WHOIS_TIMEOUT, MAX_WHOIS_TIMEOUT);
1164    else
1165     timeout = g_random_int_range (MIN_FIRST_WHOIS_TIMEOUT,
1166         MAX_FIRST_WHOIS_TIMEOUT);
1167 
1168    DEBUG_SENDER (sender, "(Re)Scheduled whois request in %d ms", timeout);
1169 
1170    if (priv->whois_timer != 0)
1171      g_source_remove (priv->whois_timer);
1172 
1173    priv->whois_timer = g_timeout_add (timeout, do_whois_request, sender);
1174 }
1175 
1176 static gboolean
check_depends(GibberRMulticastSender * sender,GibberRMulticastPacket * packet,gboolean data)1177 check_depends (GibberRMulticastSender *sender, GibberRMulticastPacket *packet,
1178     gboolean data)
1179 {
1180   guint i;
1181   GibberRMulticastSenderPrivate *priv =
1182       GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1183 
1184   g_assert (GIBBER_R_MULTICAST_PACKET_IS_RELIABLE_PACKET (packet));
1185 
1186   for (i = 0; i < packet->depends->len; i++)
1187     {
1188       GibberRMulticastSender *s;
1189       GibberRMulticastPacketSenderInfo *sender_info;
1190       guint32 other;
1191 
1192       sender_info = g_array_index (packet->depends,
1193           GibberRMulticastPacketSenderInfo *, i);
1194 
1195       s = gibber_r_multicast_sender_group_lookup (priv->group,
1196           sender_info->sender_id);
1197 
1198       if (s == NULL
1199           || s->state == GIBBER_R_MULTICAST_SENDER_STATE_NEW
1200           || s->state == GIBBER_R_MULTICAST_SENDER_STATE_UNKNOWN_FAILED)
1201         {
1202           DEBUG_SENDER (sender,
1203             "Unknown node in dependency list of packet %x: %x",
1204              sender_info->sender_id, packet->packet_id);
1205           continue;
1206         }
1207 
1208       if (data)
1209         other = s->next_output_data_packet;
1210       else
1211         other = s->next_output_packet;
1212 
1213       if (gibber_r_multicast_packet_diff (sender_info->packet_id, other) < 0)
1214         {
1215           DEBUG_SENDER (sender,
1216               "Waiting node %x to complete it's messages up to %x",
1217               sender_info->sender_id, sender_info->packet_id);
1218           if (s->state == GIBBER_R_MULTICAST_SENDER_STATE_FAILED)
1219             {
1220               DEBUG_SENDER (sender,
1221                 "Asking failed node %x to complete it's messages up to %x",
1222                 sender_info->sender_id, sender_info->packet_id);
1223               gibber_r_multicast_sender_update_end (s, sender_info->packet_id);
1224             }
1225           return FALSE;
1226         }
1227     }
1228 
1229   return TRUE;
1230 }
1231 
1232 static void
update_next_data_output_state(GibberRMulticastSender * self)1233 update_next_data_output_state (GibberRMulticastSender *self)
1234 {
1235   GibberRMulticastSenderPrivate *priv =
1236       GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (self);
1237 
1238   self->next_output_data_packet++;
1239 
1240   for (; self->next_output_data_packet != self->next_output_packet;
1241       self->next_output_data_packet++)
1242     {
1243       PacketInfo *p;
1244       p = g_hash_table_lookup (priv->packet_cache,
1245           &(self->next_output_data_packet));
1246 
1247       if (p == NULL)
1248         continue;
1249 
1250       if (p->packet->type == PACKET_TYPE_DATA
1251         && (p->packet->data.data.flags & GIBBER_R_MULTICAST_DATA_PACKET_END))
1252         {
1253           break;
1254         }
1255     }
1256 }
1257 
1258 static gboolean
pop_data_packet(GibberRMulticastSender * sender)1259 pop_data_packet (GibberRMulticastSender *sender)
1260 {
1261   GibberRMulticastSenderPrivate *priv =
1262       GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1263   PacketInfo *p;
1264   guint16 stream_id;
1265 
1266   /* If we're holding before this, skip */
1267   if (priv->holding_data &&
1268     gibber_r_multicast_packet_diff (sender->next_output_data_packet,
1269       priv->holding_point)
1270       <= 0)
1271     {
1272       DEBUG_SENDER (sender, "Holding back data finishing at %x",
1273         sender->next_output_data_packet);
1274       return FALSE;
1275     }
1276 
1277   DEBUG_SENDER (sender, "Trying to pop data finishing at %x",
1278     sender->next_output_data_packet);
1279 
1280   p = g_hash_table_lookup (priv->packet_cache,
1281     &sender->next_output_data_packet);
1282   g_assert (p != NULL);
1283 
1284   g_assert (p->packet->data.data.flags & GIBBER_R_MULTICAST_DATA_PACKET_END);
1285 
1286   stream_id = p->packet->data.data.stream_id;
1287 
1288   /* Backwards search for the start, validate the pieces and check the size */
1289   if (!(p->packet->data.data.flags & GIBBER_R_MULTICAST_DATA_PACKET_START))
1290     {
1291       guint32 i;
1292       gboolean found = FALSE;
1293 
1294       for (i = p->packet->packet_id - 1;
1295         gibber_r_multicast_packet_diff (priv->first_packet, i) >= 0; i--)
1296         {
1297            p = g_hash_table_lookup (priv->packet_cache, &i);
1298            if (p == NULL)
1299              continue;
1300 
1301            if (p->packet->type == PACKET_TYPE_DATA
1302              && p->packet->data.data.stream_id == stream_id
1303              && (p->packet->data.data.flags &
1304                   GIBBER_R_MULTICAST_DATA_PACKET_START))
1305                {
1306                  found = TRUE;
1307                  break;
1308                }
1309         }
1310 
1311       if (!found)
1312         {
1313           /* If we couldn't find the start it must have happened before we
1314            * joined the causal ordering */
1315           DEBUG_SENDER (sender,
1316             "Ignoring data starting before our first packet");
1317           update_next_data_output_state (sender);
1318           return TRUE;
1319         }
1320     }
1321 
1322   /* p is guaranteed to be the PacketInfo of the first packet */
1323 
1324   /* If there is data from before our startpoint, ignore it */
1325   if (sender->state != GIBBER_R_MULTICAST_SENDER_STATE_DATA_RUNNING
1326       && !priv->start_data)
1327   {
1328      DEBUG_SENDER (sender,
1329          "Ignoring data as we don't have a data startpoint yet");
1330      update_next_data_output_state (sender);
1331      return TRUE;
1332   }
1333 
1334   if (priv->start_data &&
1335       gibber_r_multicast_packet_diff (priv->start_point, p->packet_id) < 0)
1336     {
1337        DEBUG_SENDER (sender,
1338            "Ignoring data from before the data startpoint");
1339        update_next_data_output_state (sender);
1340        return TRUE;
1341     }
1342 
1343 
1344   if (!check_depends (sender, p->packet, TRUE))
1345     {
1346       return FALSE;
1347     }
1348 
1349   /* Everything is fine, now do a forward pass to gather all the payload, we
1350    * could have cached this info, but oh well */
1351   DEBUG_SENDER (sender, "Popping data 0x%x -> 0x%x stream_id: %x",
1352     p->packet_id, sender->next_output_data_packet,
1353     p->packet->data.data.stream_id);
1354 
1355   if (p->packet->packet_id == sender->next_output_data_packet)
1356     {
1357       gsize size;
1358       guint8 *data;
1359 
1360       data = gibber_r_multicast_packet_get_payload (p->packet, &size);
1361 
1362       if (size != p->packet->data.data.total_size)
1363         goto incorrect_data_size;
1364 
1365       update_next_data_output_state (sender);
1366       signal_data (sender, p->packet->data.data.stream_id, data, size);
1367 
1368       p->popped = TRUE;
1369       packet_info_try_gc (sender, p);
1370     }
1371   else
1372     {
1373       gsize off = 0;
1374       guint8 *data = NULL, *d;
1375       guint32 payload_size, i;
1376       gsize size;
1377 
1378       payload_size = p->packet->data.data.total_size;
1379       data = g_malloc (payload_size);
1380 
1381       for (i = p->packet_id ; i != sender->next_output_data_packet + 1 ; i++)
1382         {
1383           PacketInfo *tp  = g_hash_table_lookup (priv->packet_cache, &i);
1384 
1385           if (tp == NULL)
1386             continue;
1387 
1388           if (tp->packet->type == PACKET_TYPE_DATA
1389               && tp->packet->data.data.stream_id == stream_id)
1390             {
1391               d = gibber_r_multicast_packet_get_payload (tp->packet, &size);
1392               if (off + size > payload_size)
1393                 {
1394                   off += size;
1395                   break;
1396                 }
1397 
1398               memcpy (data + off, d, size);
1399               off += size;
1400 
1401               tp->popped = TRUE;
1402               packet_info_try_gc (sender, tp);
1403             }
1404         }
1405 
1406       if (off != payload_size)
1407         {
1408           g_free (data);
1409           goto incorrect_data_size;
1410         }
1411 
1412       update_next_data_output_state (sender);
1413       signal_data (sender, stream_id, data, payload_size);
1414       g_free (data);
1415     }
1416 
1417   return TRUE;
1418 
1419 incorrect_data_size:
1420 
1421   DEBUG_SENDER (sender, "Data packet didn't have the claimed amount of data");
1422   signal_failure (sender);
1423   return FALSE;
1424 }
1425 
1426 static void
update_acks(GibberRMulticastSender * sender,GibberRMulticastPacket * packet)1427 update_acks (GibberRMulticastSender *sender, GibberRMulticastPacket *packet)
1428 {
1429   GibberRMulticastSenderPrivate *priv =
1430       GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1431 
1432   guint i;
1433   gboolean updated = FALSE;
1434 
1435   for (i = 0; i < packet->depends->len; i++)
1436     {
1437       GibberRMulticastPacketSenderInfo *senderinfo;
1438       AckInfo *info;
1439 
1440       senderinfo = g_array_index (packet->depends,
1441         GibberRMulticastPacketSenderInfo *, i);
1442 
1443       info = (AckInfo *) g_hash_table_lookup (priv->acks,
1444           &senderinfo->sender_id);
1445 
1446       if (G_UNLIKELY(info == NULL))
1447         {
1448           info = ack_info_new (senderinfo->sender_id);
1449           g_hash_table_insert (priv->acks, &info->sender_id, info);
1450           info->packet_id = senderinfo->packet_id;
1451           info->first_packet_id = packet->packet_id;
1452           updated = TRUE;
1453         }
1454 
1455       if (gibber_r_multicast_packet_diff (info->packet_id,
1456            senderinfo->packet_id) < 0)
1457         {
1458           DEBUG_SENDER (sender, "Acks are going backward!");
1459           signal_failure (sender);
1460           return;
1461         }
1462 
1463       if (gibber_r_multicast_packet_diff (info->packet_id,
1464           senderinfo->packet_id) > 0)
1465         {
1466           info->packet_id = senderinfo->packet_id;
1467           info->first_packet_id = packet->packet_id;
1468           updated = TRUE;
1469         }
1470     }
1471 
1472     if (updated)
1473       {
1474         gibber_r_multicast_sender_group_gc (priv->group);
1475       }
1476 }
1477 
1478 static gboolean
pop_packet(GibberRMulticastSender * sender)1479 pop_packet (GibberRMulticastSender *sender)
1480 {
1481   GibberRMulticastSenderPrivate *priv =
1482       GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1483   PacketInfo *p;
1484 
1485   DEBUG_SENDER (sender, "Next output: 0x%x Next output data: 0x%x",
1486       sender->next_output_packet, sender->next_output_data_packet);
1487 
1488   if (sender->next_output_data_packet != sender->next_output_packet)
1489     {
1490       /* We saw the end of some data message before the end of the data stream,
1491        * first try if we can pop this */
1492        if (pop_data_packet (sender))
1493            return TRUE;
1494     }
1495 
1496   if (sender->state == GIBBER_R_MULTICAST_SENDER_STATE_FAILED
1497       && gibber_r_multicast_packet_diff (priv->end_point,
1498         sender->next_output_packet) >= 0)
1499     {
1500        DEBUG_SENDER (sender, "Not looking at packets behind the endpoint");
1501        return FALSE;
1502     }
1503 
1504   p = g_hash_table_lookup (priv->packet_cache, &(sender->next_output_packet));
1505 
1506   DEBUG_SENDER (sender, "Looking at 0x%x", sender->next_output_packet);
1507 
1508   if (p == NULL || p->packet == NULL)
1509     {
1510       /* No packet yet.. too bad :( */
1511       DEBUG_SENDER(sender, "No new packets to pop");
1512       return FALSE;
1513     }
1514 
1515   g_assert (GIBBER_R_MULTICAST_PACKET_IS_RELIABLE_PACKET (p->packet));
1516 
1517   update_acks (sender, p->packet);
1518 
1519   if (!check_depends (sender, p->packet, FALSE))
1520     {
1521       return FALSE;
1522     }
1523 
1524   if (p->packet->type == PACKET_TYPE_DATA)
1525     {
1526       /* A data packet. If we had a potential end before this one, skip it
1527        * we're holding back the data for some reason otherwise check
1528        * if it's an end */
1529 
1530       if (sender->next_output_data_packet == sender->next_output_packet)
1531         {
1532           sender->next_output_packet++;
1533           /* If this is the end, try to pop it. Otherwise ignore */
1534           if (p->packet->data.data.flags & GIBBER_R_MULTICAST_DATA_PACKET_END)
1535             {
1536               /* If we could pop this, then advance next_output_data_packet
1537                * otherwise keep it at this location */
1538               pop_data_packet (sender);
1539             }
1540           else
1541             {
1542               sender->next_output_data_packet++;
1543             }
1544         }
1545       else
1546         {
1547           sender->next_output_packet++;
1548         }
1549     }
1550   else
1551     {
1552       if (sender->next_output_packet == sender->next_output_data_packet)
1553         sender->next_output_data_packet++;
1554 
1555       sender->next_output_packet++;
1556 
1557       if (p->packet->type != PACKET_TYPE_NO_DATA)
1558         {
1559          signal_control_packet (sender, p->packet);
1560         }
1561 
1562       p->popped = TRUE;
1563       packet_info_try_gc (sender, p);
1564     }
1565 
1566   /* We successfully popped a new packet (weee), reschedule our watch dog */
1567   schedule_progress_timer (sender);
1568 
1569   return TRUE;
1570 }
1571 
1572 static gboolean
do_pop_packets(GibberRMulticastSender * sender)1573 do_pop_packets (GibberRMulticastSender *sender)
1574 {
1575   gboolean popped = FALSE;
1576   GibberRMulticastSenderPrivate *priv =
1577       GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1578 
1579   if (sender->state < GIBBER_R_MULTICAST_SENDER_STATE_PREPARING
1580       || sender->state > GIBBER_R_MULTICAST_SENDER_STATE_FAILED)
1581   {
1582     /* No popping untill we have at least some information */
1583     return FALSE;
1584   }
1585 
1586   /* Don't pop if our sender group was stopped */
1587   if (priv->group->stopped)
1588     return FALSE;
1589 
1590   g_object_ref (sender);
1591 
1592   while (sender->state <= GIBBER_R_MULTICAST_SENDER_STATE_FAILED)
1593     {
1594       if (!pop_packet (sender))
1595         break;
1596 
1597       popped = TRUE;
1598     }
1599 
1600   g_object_unref (sender);
1601 
1602   return popped;
1603 }
1604 
1605 static void
senders_collect(gpointer key,gpointer value,gpointer user_data)1606 senders_collect (gpointer key, gpointer value, gpointer user_data)
1607 {
1608   GibberRMulticastSender *s = GIBBER_R_MULTICAST_SENDER(value);
1609   GibberRMulticastSender *sender = GIBBER_R_MULTICAST_SENDER (user_data);
1610   GibberRMulticastSenderPrivate *priv =
1611       GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1612 
1613   if (s->state < GIBBER_R_MULTICAST_SENDER_STATE_PENDING_REMOVAL)
1614     g_queue_push_tail (priv->group->pop_queue, s);
1615 }
1616 
1617 
1618 static void
pop_packets(GibberRMulticastSender * sender)1619 pop_packets (GibberRMulticastSender *sender)
1620 {
1621   GibberRMulticastSenderPrivate *priv =
1622       GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1623   gboolean pop;
1624 
1625 
1626   if (priv->group->popping)
1627     {
1628       if (!g_queue_find (priv->group->pop_queue, sender))
1629         {
1630           /* Ensure that data is popped at the next opportunity */
1631           g_queue_push_tail (priv->group->pop_queue, sender);
1632         }
1633       return;
1634     }
1635 
1636 
1637   priv->group->popping = TRUE;
1638 
1639   g_object_ref (sender);
1640 
1641   pop = do_pop_packets (sender);
1642 
1643   /* If something is popped or a node queued itself for popping, go for it */
1644   while (pop || g_queue_peek_head (priv->group->pop_queue) != NULL)
1645     {
1646       GibberRMulticastSender *s;
1647 
1648       /* If something was popped, try to pop as much as possible from others in
1649        * this group. Else just pop all senders in the queue */
1650       if (pop)
1651         {
1652           while (g_queue_pop_head (priv->group->pop_queue) != NULL)
1653             /* pass */;
1654 
1655           g_hash_table_foreach (priv->group->senders, senders_collect, sender);
1656         }
1657 
1658       pop = FALSE;
1659       while ((s = g_queue_pop_head (priv->group->pop_queue)) != NULL)
1660         {
1661           pop |= do_pop_packets (s);
1662         }
1663     }
1664 
1665   priv->group->popping = FALSE;
1666   g_object_unref (sender);
1667 }
1668 
1669 static void
insert_packet(GibberRMulticastSender * sender,GibberRMulticastPacket * packet)1670 insert_packet (GibberRMulticastSender *sender, GibberRMulticastPacket *packet)
1671 {
1672   GibberRMulticastSenderPrivate *priv =
1673       GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1674   PacketInfo *info;
1675 
1676   g_assert (sender->state > GIBBER_R_MULTICAST_SENDER_STATE_NEW);
1677 
1678   info = g_hash_table_lookup (priv->packet_cache, &packet->packet_id);
1679   if (info != NULL && info->packet != NULL)
1680     {
1681       /* Already seen this packet */
1682       DEBUG_SENDER (sender, "Detect resent of packet 0x%x", packet->packet_id);
1683       return;
1684     }
1685 
1686   if (info == NULL)
1687     {
1688       info = packet_info_new (sender, packet->packet_id);
1689       g_hash_table_insert (priv->packet_cache, &info->packet_id, info);
1690     }
1691 
1692   if (info->timeout != 0)
1693     {
1694       g_source_remove (info->timeout);
1695       info->timeout = 0;
1696     }
1697 
1698   DEBUG_SENDER (sender, "Inserting packet 0x%x", packet->packet_id);
1699   info->packet = g_object_ref (packet);
1700 
1701   if (gibber_r_multicast_packet_diff (sender->next_input_packet,
1702                  packet->packet_id) >= 0)
1703     {
1704       /* Potentially needs some repairs */
1705       guint32 i;
1706       for (i = sender->next_input_packet; i != packet->packet_id; i++)
1707         {
1708           schedule_repair (sender, i);
1709         }
1710       sender->next_input_packet = packet->packet_id + 1;
1711     }
1712 
1713   /* pop out as many packets as we can */
1714   pop_packets (sender);
1715 
1716   return;
1717 }
1718 
1719 
1720 void
gibber_r_multicast_sender_update_start(GibberRMulticastSender * sender,guint32 packet_id)1721 gibber_r_multicast_sender_update_start (GibberRMulticastSender *sender,
1722     guint32 packet_id)
1723 {
1724   GibberRMulticastSenderPrivate *priv =
1725       GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1726 
1727   DEBUG_SENDER (sender, "Updating start to %x", packet_id);
1728   g_assert (sender->state < GIBBER_R_MULTICAST_SENDER_STATE_FAILED);
1729 
1730   if (sender->state == GIBBER_R_MULTICAST_SENDER_STATE_NEW)
1731     {
1732       g_assert (g_hash_table_size (priv->packet_cache) == 0);
1733 
1734       set_state (sender, GIBBER_R_MULTICAST_SENDER_STATE_PREPARING);
1735 
1736       sender->next_input_packet = packet_id;
1737       sender->next_output_packet = packet_id;
1738       sender->next_output_data_packet = packet_id;
1739       priv->first_packet = packet_id;
1740     }
1741   else if (gibber_r_multicast_packet_diff (sender->next_input_packet,
1742       packet_id) > 0)
1743     {
1744       /* Remove all repair requests for packets up to this packet_id */
1745       guint32 i;
1746       for (i = priv->first_packet; i < packet_id; i++)
1747         {
1748           PacketInfo *info;
1749           info = g_hash_table_lookup (priv->packet_cache, &i);
1750           if (info != NULL && info->packet == NULL && info->timeout != 0)
1751             {
1752               g_source_remove (info->timeout);
1753               info->timeout = 0;
1754             }
1755         }
1756 
1757       sender->next_input_packet = packet_id;
1758       sender->next_output_packet = packet_id;
1759       sender->next_output_data_packet = packet_id;
1760     }
1761 }
1762 
1763 void
gibber_r_multicast_sender_update_end(GibberRMulticastSender * sender,guint32 packet_id)1764 gibber_r_multicast_sender_update_end (GibberRMulticastSender *sender,
1765   guint32 packet_id)
1766 {
1767   GibberRMulticastSenderPrivate *priv =
1768       GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1769 
1770   g_assert (sender->state == GIBBER_R_MULTICAST_SENDER_STATE_FAILED);
1771 
1772   if  (gibber_r_multicast_packet_diff (priv->end_point, packet_id) >= 0)
1773     {
1774       DEBUG_SENDER (sender, "Updating end to %x", packet_id);
1775       priv->end_point = packet_id;
1776       pop_packets (sender);
1777     }
1778 }
1779 
1780 void
gibber_r_multicast_sender_set_failed(GibberRMulticastSender * sender)1781 gibber_r_multicast_sender_set_failed (GibberRMulticastSender *sender)
1782 {
1783   GibberRMulticastSenderPrivate *priv =
1784       GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1785 
1786   if (sender->state >= GIBBER_R_MULTICAST_SENDER_STATE_FAILED)
1787     return;
1788 
1789   if (sender->state < GIBBER_R_MULTICAST_SENDER_STATE_PREPARING)
1790     {
1791       DEBUG_SENDER (sender, "Failed before we knew anything");
1792       set_state (sender, GIBBER_R_MULTICAST_SENDER_STATE_UNKNOWN_FAILED);
1793     }
1794   else
1795     {
1796       set_state (sender, GIBBER_R_MULTICAST_SENDER_STATE_FAILED);
1797       priv->end_point = sender->next_output_packet;
1798       DEBUG_SENDER (sender, "Marked sender as failed. Endpoint %x",
1799         priv->end_point);
1800     }
1801 
1802   cancel_failure_timers (sender);
1803 }
1804 
1805 void
gibber_r_multicast_sender_set_data_start(GibberRMulticastSender * sender,guint32 packet_id)1806 gibber_r_multicast_sender_set_data_start (GibberRMulticastSender *sender,
1807     guint32 packet_id)
1808 {
1809   GibberRMulticastSenderPrivate *priv =
1810       GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1811 
1812   g_assert (sender->state < GIBBER_R_MULTICAST_SENDER_STATE_DATA_RUNNING);
1813 
1814   DEBUG_SENDER (sender, "Setting data start at 0x%x", packet_id);
1815 
1816   priv->start_data = TRUE;
1817   priv->start_point = packet_id;
1818 }
1819 
1820 void
gibber_r_multicast_sender_push(GibberRMulticastSender * sender,GibberRMulticastPacket * packet)1821 gibber_r_multicast_sender_push (GibberRMulticastSender *sender,
1822     GibberRMulticastPacket *packet)
1823 {
1824   GibberRMulticastSenderPrivate *priv =
1825       GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1826   gint diff;
1827 
1828   g_assert (sender->id == packet->sender);
1829 
1830   if (sender->state < GIBBER_R_MULTICAST_SENDER_STATE_PREPARING)
1831     {
1832       /* Don't know where to start, so ignore..
1833        * A potential optimisation would be to cache a limited amount anyway, so
1834        * we don't have to repair them if we should have catched these anyways
1835        * */
1836       return;
1837     }
1838 
1839   diff = gibber_r_multicast_packet_diff (sender->next_output_packet,
1840       packet->packet_id);
1841 
1842   if (diff >= 0 && diff < PACKET_CACHE_SIZE) {
1843     insert_packet (sender, packet);
1844     return;
1845   }
1846 
1847   if (diff < 0 && gibber_r_multicast_packet_diff (priv->first_packet,
1848              packet->packet_id) >= 0)
1849     {
1850       /* We already had this one, silently ignore */
1851       DEBUG_SENDER (sender, "Detect resent of packet 0x%x",
1852           packet->packet_id);
1853       return;
1854     }
1855 
1856   DEBUG_SENDER (sender, "Packet 0x%x out of range, dropping (%x %x %x)",
1857       packet->packet_id, priv->first_packet,
1858       sender->next_output_packet, sender->next_input_packet);
1859 }
1860 
1861 gboolean
gibber_r_multicast_sender_repair_request(GibberRMulticastSender * sender,guint32 id)1862 gibber_r_multicast_sender_repair_request (GibberRMulticastSender *sender,
1863     guint32 id)
1864 {
1865   GibberRMulticastSenderPrivate *priv =
1866       GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1867   gint diff;
1868   PacketInfo *info;
1869 
1870   if (sender->state < GIBBER_R_MULTICAST_SENDER_STATE_PREPARING)
1871     {
1872       DEBUG_SENDER (sender, "ignore repair request");
1873       return FALSE;
1874     }
1875 
1876   info = g_hash_table_lookup (priv->packet_cache, &id);
1877   if (info != NULL && info->packet != NULL)
1878     {
1879       schedule_do_repair (sender, id);
1880       return TRUE;
1881     }
1882 
1883   diff = gibber_r_multicast_packet_diff (sender->next_output_packet, id);
1884 
1885   if (diff >= 0 && diff < PACKET_CACHE_SIZE)
1886     {
1887       if (sender->state >= GIBBER_R_MULTICAST_SENDER_STATE_STOPPED)
1888         /* Beyond stopped state we only send out repairs for packets we have */
1889         return FALSE;
1890 
1891       if (info == NULL)
1892         {
1893           guint32 i;
1894 
1895           for (i = sender->next_output_packet ; i != id + 1; i++)
1896             {
1897               schedule_repair (sender, i);
1898             }
1899         }
1900       else
1901         {
1902           /* else we already knew about the packets existance, but didn't see
1903            the packet just yet. Which means we already have a repair timeout
1904            running */
1905            g_assert (info->timeout != 0);
1906            /* Reschedule the repair */
1907            g_source_remove (info->timeout);
1908            info->timeout = 0;
1909            schedule_repair (sender, id);
1910         }
1911 
1912       return TRUE;
1913     }
1914 
1915   DEBUG_SENDER (sender, "Repair request packet 0x%x out of range, ignoring",
1916       id);
1917 
1918   return FALSE;
1919 }
1920 
1921 gboolean
gibber_r_multicast_sender_seen(GibberRMulticastSender * sender,guint32 id)1922 gibber_r_multicast_sender_seen (GibberRMulticastSender *sender, guint32 id)
1923 {
1924   gint diff;
1925   guint32 i, last;
1926 
1927   g_assert (sender != NULL);
1928   DEBUG_SENDER(sender, "Seen next packet 0x%x", id);
1929 
1930   if (sender->state < GIBBER_R_MULTICAST_SENDER_STATE_PREPARING
1931       || sender->state >= GIBBER_R_MULTICAST_SENDER_STATE_UNKNOWN_FAILED)
1932     {
1933       return FALSE;
1934     }
1935 
1936   diff = gibber_r_multicast_packet_diff (sender->next_input_packet, id);
1937   if (diff < 0)
1938     return TRUE;
1939 
1940   last = sender->next_output_packet + PACKET_CACHE_SIZE;
1941 
1942   /* Ensure that we don't overfill the CACHE */
1943   last = gibber_r_multicast_packet_diff (last, id) > 0 ? last : id;
1944 
1945   for (i = sender->next_input_packet; i != last; i ++)
1946     {
1947       schedule_repair (sender, i);
1948     }
1949   return FALSE;
1950 }
1951 
1952 void
gibber_r_multicast_senders_updated(GibberRMulticastSender * sender)1953 gibber_r_multicast_senders_updated (GibberRMulticastSender *sender)
1954 {
1955   pop_packets (sender);
1956 }
1957 
1958 void
gibber_r_multicast_sender_whois_push(GibberRMulticastSender * sender,const GibberRMulticastPacket * packet)1959 gibber_r_multicast_sender_whois_push (GibberRMulticastSender *sender,
1960     const GibberRMulticastPacket *packet)
1961 {
1962   GibberRMulticastSenderPrivate *priv =
1963     GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1964 
1965   switch (packet->type) {
1966     case PACKET_TYPE_WHOIS_REQUEST:
1967       g_assert (packet->data.whois_request.sender_id == sender->id);
1968 
1969       if (sender->name != NULL)
1970         {
1971           if (priv->whois_timer == 0)
1972             {
1973               gint timeout = g_random_int_range (MIN_WHOIS_REPLY_TIMEOUT,
1974                    MAX_WHOIS_REPLY_TIMEOUT);
1975               priv->whois_timer =
1976                 g_timeout_add (timeout, do_whois_reply, sender);
1977               DEBUG_SENDER (sender, "Scheduled whois reply in %d ms", timeout);
1978             }
1979         }
1980       else
1981         {
1982           schedule_whois_request (sender, TRUE);
1983         }
1984       break;
1985     case PACKET_TYPE_WHOIS_REPLY:
1986       g_assert (packet->sender == sender->id);
1987 
1988       if (sender->name == NULL)
1989         {
1990           name_discovered (sender, packet->data.whois_reply.sender_name);
1991         }
1992       else
1993         {
1994           /* FIXME: collision detection */
1995           stop_whois_discovery (sender);
1996         }
1997 
1998       pop_packets (sender);
1999       break;
2000     default:
2001       g_assert_not_reached ();
2002   }
2003 }
2004 
2005 void
gibber_r_multicast_sender_set_packet_repeat(GibberRMulticastSender * sender,guint32 packet_id,gboolean repeat)2006 gibber_r_multicast_sender_set_packet_repeat (GibberRMulticastSender *sender,
2007     guint32 packet_id, gboolean repeat)
2008 {
2009   GibberRMulticastSenderPrivate *priv =
2010     GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
2011 
2012   PacketInfo *info;
2013 
2014   info = g_hash_table_lookup (priv->packet_cache, &packet_id);
2015   g_assert (info != NULL && info->packet != NULL);
2016 
2017   if (info->repeating == repeat)
2018     {
2019       return;
2020     }
2021 
2022   info->repeating = repeat;
2023 
2024   if (repeat)
2025     {
2026       if (info->timeout == 0)
2027          schedule_do_repair (sender, packet_id);
2028     }
2029   else
2030    {
2031      packet_info_try_gc (sender, info);
2032    }
2033 
2034   /* FIXME: If repeat is turned off, we repeat it at least once more as there
2035    * might have been a repair request after the last repeating.. This is
2036    * ofcourse suboptimal */
2037 }
2038 
2039 guint
gibber_r_multicast_sender_packet_cache_size(GibberRMulticastSender * sender)2040 gibber_r_multicast_sender_packet_cache_size ( GibberRMulticastSender *sender)
2041 {
2042   GibberRMulticastSenderPrivate *priv =
2043     GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
2044 
2045   /* The important cache size is until our cutoff point, which can be less
2046    * then the last packet we actually did receive from this sender */
2047   if (sender->state >= GIBBER_R_MULTICAST_SENDER_STATE_FAILED)
2048     return gibber_r_multicast_packet_diff (priv->first_packet,
2049         priv->end_point);
2050 
2051   return gibber_r_multicast_packet_diff (priv->first_packet,
2052       sender->next_input_packet);
2053 }
2054 
2055 static AckInfo *
gibber_r_multicast_sender_get_ackinfo(GibberRMulticastSender * sender,guint32 sender_id)2056 gibber_r_multicast_sender_get_ackinfo (GibberRMulticastSender *sender,
2057     guint32 sender_id)
2058 {
2059   GibberRMulticastSenderPrivate *priv =
2060     GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
2061 
2062   return (AckInfo *) g_hash_table_lookup (priv->acks, &sender_id);
2063 }
2064 
2065 void
gibber_r_multicast_sender_ack(GibberRMulticastSender * sender,guint32 ack)2066 gibber_r_multicast_sender_ack (GibberRMulticastSender *sender, guint32 ack)
2067 {
2068   guint32 i;
2069   GibberRMulticastSenderPrivate *priv =
2070     GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
2071 
2072   if (gibber_r_multicast_packet_diff (priv->first_packet, ack) < 0)
2073     {
2074       return;
2075     }
2076 
2077   for (i = priv->first_packet ; i != ack; i++)
2078     {
2079       PacketInfo *info;
2080 
2081       info = g_hash_table_lookup (priv->packet_cache, &i);
2082       if (info == NULL)
2083         continue;
2084 
2085       info->acked = TRUE;
2086       packet_info_try_gc (sender, info);
2087     }
2088 }
2089 
2090 
2091 
2092 /* Tell the sender to not signal data starting from this packet */
2093 void
gibber_r_multicast_sender_hold_data(GibberRMulticastSender * sender,guint32 packet_id)2094 gibber_r_multicast_sender_hold_data (GibberRMulticastSender *sender,
2095   guint32 packet_id)
2096 {
2097   GibberRMulticastSenderPrivate *priv =
2098     GIBBER_R_MULTICAST_SENDER_GET_PRIVATE(sender);
2099 
2100   priv->holding_data = TRUE;
2101   priv->holding_point = packet_id;
2102   DEBUG_SENDER (sender, "Holding data starting at %x", packet_id);
2103 
2104   /* Pop packets in case the holding_point moved forward */
2105   pop_packets (sender);
2106 }
2107 
2108 /* Stop holding back data of the sender */
2109 void
gibber_r_multicast_sender_release_data(GibberRMulticastSender * sender)2110 gibber_r_multicast_sender_release_data (GibberRMulticastSender *sender)
2111 {
2112   GibberRMulticastSenderPrivate *priv =
2113     GIBBER_R_MULTICAST_SENDER_GET_PRIVATE(sender);
2114 
2115   DEBUG_SENDER (sender, "Releasing data");
2116   priv->holding_data = FALSE;
2117   pop_packets (sender);
2118 }
2119 
2120 static void
stop_packet(gpointer key,gpointer value,gpointer user_data)2121 stop_packet (gpointer key, gpointer value, gpointer user_data)
2122 {
2123   PacketInfo *p = (PacketInfo *) value;
2124 
2125   if (p->timeout != 0)
2126     {
2127       g_source_remove (p->timeout);
2128       p->timeout = 0;
2129     }
2130 }
2131 
2132 void
gibber_r_multicast_sender_stop(GibberRMulticastSender * sender)2133 gibber_r_multicast_sender_stop (GibberRMulticastSender *sender)
2134 {
2135   GibberRMulticastSenderPrivate *priv =
2136     GIBBER_R_MULTICAST_SENDER_GET_PRIVATE(sender);
2137 
2138   if (sender->state >= GIBBER_R_MULTICAST_SENDER_STATE_STOPPED)
2139     return;
2140 
2141   if (priv->whois_timer != 0)
2142     {
2143       g_source_remove (priv->whois_timer);
2144       priv->whois_timer = 0;
2145     }
2146 
2147   g_hash_table_foreach (priv->packet_cache, stop_packet, NULL);
2148   set_state (sender, GIBBER_R_MULTICAST_SENDER_STATE_STOPPED);
2149 }
2150 
2151 void
_gibber_r_multicast_TEST_sender_fail(GibberRMulticastSender * sender)2152 _gibber_r_multicast_TEST_sender_fail (GibberRMulticastSender *sender)
2153 {
2154   signal_failure (sender);
2155 }
2156 
2157