1 /*
2 * gibber-r-multicast-sender.c - Source for GibberRMulticastSender
3 * Copyright (C) 2006-2007 Collabora Ltd.
4 * @author Sjoerd Simons <sjoerd@luon.net>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
21
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <string.h>
25
26 #include "gibber-r-multicast-sender.h"
27 #include "gibber-util.h"
28 #include "gibber-signals-marshal.h"
29
30 #define DEBUG_FLAG DEBUG_RMULTICAST_SENDER
31 #include "gibber-debug.h"
32
33 #define DEBUG_SENDER(sender, format, ...) \
34 DEBUG("%s %x: " format, sender->name, sender->id, ##__VA_ARGS__)
35
36 #define PACKET_CACHE_SIZE 256
37
38 #define MIN_DO_REPAIR_TIMEOUT 50
39 #define MAX_DO_REPAIR_TIMEOUT 100
40
41 #define MIN_INITIAL_REPAIR_TIMEOUT 150
42 #define MAX_INITIAL_REPAIR_TIMEOUT 250
43
44 #define MIN_REPAIR_TIMEOUT 500
45 #define MAX_REPAIR_TIMEOUT 800
46
47 #define MIN_FIRST_WHOIS_TIMEOUT 50
48 #define MAX_FIRST_WHOIS_TIMEOUT 200
49
50 #define MIN_WHOIS_TIMEOUT 400
51 #define MAX_WHOIS_TIMEOUT 600
52
53 #define MIN_WHOIS_REPLY_TIMEOUT 50
54 #define MAX_WHOIS_REPLY_TIMEOUT 200
55
56 /* At least one packet must be popped every 5 minutes.. Reliable keepalives
57 * are send out every three minutes.. */
58 #define MAX_PROGRESS_TIMEOUT 300000
59
60 /* The senders name should be discovered within about 10 seconds or else it is
61 * fauly */
62 #define NAME_DISCOVERY_TIME 10000
63
64 #define GIBBER_R_MULTICAST_SENDER_GET_PRIVATE(o) \
65 (G_TYPE_INSTANCE_GET_PRIVATE ((o), GIBBER_TYPE_R_MULTICAST_SENDER, \
66 GibberRMulticastSenderPrivate))
67
68 /* private structure */
69 typedef struct _GibberRMulticastSenderPrivate GibberRMulticastSenderPrivate;
70
71 static void set_state (GibberRMulticastSender *sender,
72 GibberRMulticastSenderState state);
73
74 struct _GibberRMulticastSenderPrivate
75 {
76 gboolean dispose_has_run;
77 /* hash table with packets */
78 GHashTable *packet_cache;
79
80 /* Table with acks per sender
81 * guint32 * => owned AckInfo * */
82 GHashTable *acks;
83
84 /* Sendergroup to which we belong */
85 GibberRMulticastSenderGroup *group;
86
87 /* Very first packet number in the current window */
88 guint32 first_packet;
89
90 /* whois reply/request timer */
91 guint whois_timer;
92
93 /* timer untill which a failure even occurs */
94 guint fail_timer;
95
96 /* Whether we are holding back data currently */
97 gboolean holding_data;
98 guint32 holding_point;
99
100 /* Whether we went know the data starting point or not */
101 gboolean start_data;
102 guint32 start_point;
103
104 /* Endpoint is just there in case we are in failure mode */
105 guint32 end_point;
106 };
107
108 typedef struct {
109 guint32 sender_id;
110 guint32 packet_id;
111 /* First packet that had this ack */
112 guint32 first_packet_id;
113 } AckInfo;
114
115 static void
ack_info_free(gpointer data)116 ack_info_free (gpointer data)
117 {
118 g_slice_free (AckInfo, data);
119 }
120
121 static AckInfo *
ack_info_new(guint32 sender_id)122 ack_info_new (guint32 sender_id)
123 {
124 AckInfo *result;
125 result = g_slice_new0 (AckInfo);
126 result->sender_id = sender_id;
127 return result;
128 }
129
130
131 struct _group_ht_data {
132 GibberRMulticastSenderGroup *group;
133 GibberRMulticastSender *target;
134 GibberRMulticastSender *sender;
135 };
136
137 static AckInfo *
138 gibber_r_multicast_sender_get_ackinfo (GibberRMulticastSender *sender,
139 guint32 sender_id);
140
141 GibberRMulticastSenderGroup *
gibber_r_multicast_sender_group_new(void)142 gibber_r_multicast_sender_group_new (void)
143 {
144 GibberRMulticastSenderGroup *result;
145 result = g_slice_new0 (GibberRMulticastSenderGroup);
146
147 result->senders = g_hash_table_new_full (g_direct_hash, g_direct_equal,
148 NULL, g_object_unref);
149 result->pop_queue = g_queue_new ();
150 result->pending_removal = g_ptr_array_new ();
151 return result;
152 }
153
154 void
gibber_r_multicast_sender_group_free(GibberRMulticastSenderGroup * group)155 gibber_r_multicast_sender_group_free (GibberRMulticastSenderGroup *group)
156 {
157 GHashTable *h;
158 guint i;
159
160 g_assert (group->popping == FALSE);
161
162 h = group->senders;
163 group->senders = NULL;
164 g_hash_table_unref (h);
165
166 for (i = 0; i < group->pending_removal->len ; i++)
167 {
168 g_object_unref (G_OBJECT (
169 g_ptr_array_index (group->pending_removal, i)));
170 }
171
172 g_ptr_array_unref (group->pending_removal);
173
174 g_queue_free (group->pop_queue);
175 g_slice_free (GibberRMulticastSenderGroup, group);
176 }
177
178 static void
stop_sender(gpointer key,gpointer value,gpointer user_data)179 stop_sender (gpointer key, gpointer value, gpointer user_data)
180 {
181 GibberRMulticastSender *sender = GIBBER_R_MULTICAST_SENDER(value);
182
183 gibber_r_multicast_sender_stop (sender);
184 }
185
186 void
gibber_r_multicast_sender_group_stop(GibberRMulticastSenderGroup * group)187 gibber_r_multicast_sender_group_stop (GibberRMulticastSenderGroup *group)
188 {
189 g_hash_table_foreach (group->senders, stop_sender, NULL);
190 group->stopped = TRUE;
191 }
192
193 void
gibber_r_multicast_sender_group_add(GibberRMulticastSenderGroup * group,GibberRMulticastSender * sender)194 gibber_r_multicast_sender_group_add (GibberRMulticastSenderGroup *group,
195 GibberRMulticastSender *sender)
196 {
197 DEBUG ("Adding %x to sender group", sender->id);
198 g_hash_table_insert (group->senders, GUINT_TO_POINTER (sender->id), sender);
199 }
200
201
202 GibberRMulticastSender *
gibber_r_multicast_sender_group_lookup(GibberRMulticastSenderGroup * group,guint32 sender_id)203 gibber_r_multicast_sender_group_lookup (GibberRMulticastSenderGroup *group,
204 guint32 sender_id)
205 {
206 return g_hash_table_lookup (group->senders, GUINT_TO_POINTER (sender_id));
207 }
208
209 static gboolean
find_by_name(gpointer key,gpointer value,gpointer user_data)210 find_by_name (gpointer key, gpointer value, gpointer user_data)
211 {
212 GibberRMulticastSender *sender = GIBBER_R_MULTICAST_SENDER (value);
213 const gchar *name = (gchar *) user_data;
214
215 if (sender->state == GIBBER_R_MULTICAST_SENDER_STATE_PENDING_REMOVAL)
216 return FALSE;
217
218 return !gibber_strdiff (sender->name, name);
219 }
220
221 GibberRMulticastSender *
gibber_r_multicast_sender_group_lookup_by_name(GibberRMulticastSenderGroup * group,const gchar * name)222 gibber_r_multicast_sender_group_lookup_by_name (
223 GibberRMulticastSenderGroup *group, const gchar *name)
224 {
225 return g_hash_table_find (group->senders, find_by_name, (gpointer) name);
226 }
227
228 static void
cleanup_acks(gpointer key,gpointer value,gpointer user_data)229 cleanup_acks (gpointer key, gpointer value, gpointer user_data)
230 {
231 GibberRMulticastSender *sender = GIBBER_R_MULTICAST_SENDER (value);
232 GibberRMulticastSenderPrivate *priv =
233 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
234
235 g_hash_table_remove (priv->acks, (guint32 *) user_data);
236 }
237
238 static void
gibber_r_multicast_sender_group_gc_acks(GibberRMulticastSenderGroup * group,guint32 sender_id)239 gibber_r_multicast_sender_group_gc_acks (GibberRMulticastSenderGroup *group,
240 guint32 sender_id)
241 {
242 guint i;
243 GibberRMulticastSender *s;
244 /* If there are no more senders for sender_id in the list, clean up the acks
245 */
246 if (g_hash_table_lookup (group->senders, GUINT_TO_POINTER (sender_id))
247 != NULL)
248 return;
249
250 for (i = 0; i < group->pending_removal->len ; i++)
251 {
252 s = GIBBER_R_MULTICAST_SENDER (
253 g_ptr_array_index (group->pending_removal, i));
254 if (s->id == sender_id)
255 return;
256 }
257
258 g_hash_table_foreach (group->senders, cleanup_acks, &sender_id);
259 }
260
261 void
gibber_r_multicast_sender_group_remove(GibberRMulticastSenderGroup * group,guint32 sender_id)262 gibber_r_multicast_sender_group_remove (GibberRMulticastSenderGroup *group,
263 guint32 sender_id)
264 {
265 GibberRMulticastSender *s;
266
267 DEBUG ("Removing %x from sender group", sender_id);
268
269 s = g_hash_table_lookup (group->senders, GUINT_TO_POINTER(sender_id));
270
271 if (s == NULL)
272 {
273 DEBUG ("Can't remove unknown sender id: %x", sender_id);
274 return;
275 }
276
277 g_queue_remove (group->pop_queue, s);
278 set_state (s, GIBBER_R_MULTICAST_SENDER_STATE_PENDING_REMOVAL);
279
280 if (gibber_r_multicast_sender_packet_cache_size (s) > 0)
281 {
282 DEBUG ("Keeping %x in cache, %d items left", sender_id,
283 gibber_r_multicast_sender_packet_cache_size (s));
284 gibber_r_multicast_sender_stop (s);
285 g_hash_table_steal (group->senders, GUINT_TO_POINTER (sender_id));
286 g_ptr_array_add (group->pending_removal, s);
287 }
288 else
289 {
290 g_hash_table_remove (group->senders, GUINT_TO_POINTER(sender_id));
291 gibber_r_multicast_sender_group_gc_acks (group, sender_id);
292 }
293
294 }
295
296 static void
create_sender_array(gpointer key,gpointer value,gpointer user_data)297 create_sender_array (gpointer key, gpointer value, gpointer user_data)
298 {
299 GibberRMulticastSender *sender = GIBBER_R_MULTICAST_SENDER (value);
300 GArray *array = (GArray *) user_data;
301 AckInfo info;
302
303 if (sender->state >= GIBBER_R_MULTICAST_SENDER_STATE_STOPPED)
304 return;
305
306 info.sender_id = sender->id;
307 info.packet_id = sender->next_input_packet;
308
309 g_array_append_val (array, info);
310 }
311
312 static void
update_sender_acks(gpointer key,gpointer value,gpointer user_data)313 update_sender_acks (gpointer key, gpointer value, gpointer user_data)
314 {
315 GibberRMulticastSender *sender = GIBBER_R_MULTICAST_SENDER (value);
316 GArray *array = (GArray *) user_data;
317 guint i;
318
319 if (sender->state >= GIBBER_R_MULTICAST_SENDER_STATE_STOPPED)
320 return;
321
322 for (i = 0; i < array->len ; i++)
323 {
324 AckInfo *info = &g_array_index (array, AckInfo, i);
325 AckInfo *ack;
326
327 if (sender->id == info->sender_id)
328 continue;
329
330 if ((ack = gibber_r_multicast_sender_get_ackinfo (sender,
331 info->sender_id)) != NULL)
332 {
333 if (gibber_r_multicast_packet_diff (ack->packet_id,
334 info->packet_id) > 0)
335 info->packet_id = ack->packet_id;
336 }
337 else
338 {
339 g_array_remove_index_fast (array, i);
340 /* The last element is now placed at location i, so retry i */
341 i--;
342 continue;
343 }
344 }
345 }
346
347 static AckInfo *
get_direct_ack(GibberRMulticastSender * sender,GibberRMulticastSender * target)348 get_direct_ack (GibberRMulticastSender *sender, GibberRMulticastSender *target)
349 {
350 AckInfo *ack;
351
352 ack = gibber_r_multicast_sender_get_ackinfo (sender, target->id);
353
354 if (G_LIKELY (ack != NULL))
355 {
356 /* Returning the direct ack if there is one */
357 if (G_LIKELY (gibber_r_multicast_packet_diff (
358 target->next_output_packet, ack->packet_id) >= 0))
359 return ack;
360 }
361
362 return NULL;
363 }
364
365 static gboolean
find_indirect_ack(gpointer key,gpointer value,gpointer user_data)366 find_indirect_ack (gpointer key, gpointer value, gpointer user_data)
367 {
368 struct _group_ht_data *hd = (struct _group_ht_data *) user_data;
369 GibberRMulticastSender *sender = GIBBER_R_MULTICAST_SENDER (value);
370 AckInfo *target_ack, *ack;
371
372 if (sender == hd->sender)
373 return FALSE;
374
375 target_ack = get_direct_ack (sender, hd->target);
376
377 if (target_ack == NULL)
378 return FALSE;
379
380 ack = gibber_r_multicast_sender_get_ackinfo (hd->sender, sender->id);
381 if (ack == NULL)
382 return FALSE;
383
384 return gibber_r_multicast_packet_diff (target_ack->first_packet_id,
385 ack->packet_id) > 0;
386 }
387
388 static gboolean
failure_not_acked(gpointer key,gpointer value,gpointer user_data)389 failure_not_acked (gpointer key, gpointer value, gpointer user_data)
390 {
391 GibberRMulticastSender *sender = GIBBER_R_MULTICAST_SENDER (value);
392 struct _group_ht_data *hd = (struct _group_ht_data *) user_data;
393
394 if (sender->state == GIBBER_R_MULTICAST_SENDER_STATE_PENDING_REMOVAL)
395 return FALSE;
396
397 /* A failure is acked iff each sender has acked it's last packet (direct ack)
398 * or a sender acked a packet of another sender acking the failures last
399 * packet (indirect ack) or if the sender never has even heard of this node.
400 * */
401
402 if (get_direct_ack (sender, hd->target) != NULL)
403 return FALSE;
404
405 hd->sender = sender;
406
407 return g_hash_table_find (hd->group->senders, find_indirect_ack, hd) == NULL;
408 }
409
410 static gboolean
can_gc_sender(GibberRMulticastSenderGroup * group,GibberRMulticastSender * sender)411 can_gc_sender (GibberRMulticastSenderGroup *group,
412 GibberRMulticastSender *sender)
413 {
414 struct _group_ht_data hd;
415 GibberRMulticastSender *ret;
416
417 hd.group = group;
418 hd.target = sender;
419
420 ret = g_hash_table_find (group->senders, failure_not_acked, &hd);
421
422 if (ret == NULL)
423 DEBUG_SENDER (sender, "Removed by GC");
424 else
425 DEBUG_SENDER (sender, "Not removed by GC because of %s (%x)",
426 ret->name, ret->id);
427
428 return ret == NULL;
429 }
430
431 static void
gibber_r_multicast_sender_group_gc(GibberRMulticastSenderGroup * group)432 gibber_r_multicast_sender_group_gc (GibberRMulticastSenderGroup *group)
433 {
434 GArray *array;
435 guint i;
436
437 array = g_array_sized_new (FALSE, TRUE, sizeof (AckInfo),
438 g_hash_table_size (group->senders));
439
440 g_hash_table_foreach (group->senders, create_sender_array, array);
441 g_hash_table_foreach (group->senders, update_sender_acks, array);
442
443 for (i = 0; i < array->len ; i++)
444 {
445 AckInfo *info = &g_array_index (array, AckInfo, i);
446 GibberRMulticastSender *sender = g_hash_table_lookup (group->senders,
447 GUINT_TO_POINTER (info->sender_id));
448
449 gibber_r_multicast_sender_ack (sender, info->packet_id);
450 }
451
452 g_array_unref (array);
453
454 /* Check if we can remove pending removals */
455 for (i = 0; i < group->pending_removal->len ; i++)
456 {
457 GibberRMulticastSender *s;
458
459 s = GIBBER_R_MULTICAST_SENDER (g_ptr_array_index (group->pending_removal,
460 i));
461 if (can_gc_sender (group, s))
462 {
463 g_ptr_array_remove_index_fast (group->pending_removal, i);
464 gibber_r_multicast_sender_group_gc_acks (group, s->id);
465 g_object_unref (s);
466 /* Last entry has replaced i, so force a retry of i */
467 i--;
468 }
469 }
470 }
471
472 gboolean
gibber_r_multicast_sender_group_push_packet(GibberRMulticastSenderGroup * group,GibberRMulticastPacket * packet)473 gibber_r_multicast_sender_group_push_packet (
474 GibberRMulticastSenderGroup *group, GibberRMulticastPacket *packet)
475 {
476 gboolean handled = FALSE;
477 GibberRMulticastSender *sender;
478 guint i;
479
480 if (packet->type == PACKET_TYPE_WHOIS_REQUEST)
481 sender = gibber_r_multicast_sender_group_lookup (group,
482 packet->data.whois_request.sender_id);
483 else
484 sender = gibber_r_multicast_sender_group_lookup (group,
485 packet->sender);
486
487 switch (packet->type)
488 {
489 case PACKET_TYPE_WHOIS_REQUEST:
490 /* Pending removal nodes still reply to WHOIS_REQUEST to prevent new
491 * nodes from taking the same id */
492 if (sender == NULL)
493 {
494 GibberRMulticastSender *s;
495 for (i = 0; i < group->pending_removal->len ; i++)
496 {
497 s = GIBBER_R_MULTICAST_SENDER (
498 g_ptr_array_index (group->pending_removal, i));
499 if (s->id == packet->data.whois_request.sender_id)
500 {
501 sender = s;
502 break;
503 }
504 }
505 }
506 /* fallthrough */
507 case PACKET_TYPE_WHOIS_REPLY:
508 if (sender != NULL)
509 {
510 gibber_r_multicast_sender_whois_push (sender, packet);
511 handled = TRUE;
512 }
513 break;
514 case PACKET_TYPE_REPAIR_REQUEST:
515 {
516 GibberRMulticastSender *rsender;
517 guint32 sender_id = packet->data.repair_request.sender_id;
518 guint32 packet_id = packet->data.repair_request.packet_id;
519
520 rsender = gibber_r_multicast_sender_group_lookup (group, sender_id);
521
522 g_assert (sender_id != 0);
523
524 if (rsender != NULL &&
525 gibber_r_multicast_sender_repair_request (rsender, packet_id))
526 {
527 /* rsender took up the repair request. */
528 handled = TRUE;
529 break;
530 }
531
532 for (i = 0; i < group->pending_removal->len ; i++)
533 {
534 rsender = GIBBER_R_MULTICAST_SENDER (
535 g_ptr_array_index (group->pending_removal, i));
536 if (rsender->id == sender_id)
537 {
538 if (gibber_r_multicast_sender_repair_request (rsender,
539 packet_id))
540 {
541 handled = TRUE;
542 break;
543 }
544 }
545 }
546 DEBUG ("Ignoring repair request for unknown original sender");
547 break;
548 }
549 case PACKET_TYPE_SESSION:
550 /* Session message aren't handled by us. But if we know the sender it's
551 * at least not a foreign sender */
552 if (sender != NULL)
553 handled = TRUE;
554 break;
555 default:
556 if (GIBBER_R_MULTICAST_PACKET_IS_RELIABLE_PACKET (packet))
557 {
558 if (sender != NULL
559 && sender->state > GIBBER_R_MULTICAST_SENDER_STATE_NEW )
560 {
561 gibber_r_multicast_sender_push (sender, packet);
562 handled = TRUE;
563 }
564 else
565 {
566 for (i = 0; i < group->pending_removal->len ; i++)
567 {
568 sender = GIBBER_R_MULTICAST_SENDER (
569 g_ptr_array_index (group->pending_removal, i));
570 if (sender->id == packet->sender)
571 {
572 /* Say we have handled a reliable packet if there is any
573 * node to remove with the right sender id.. This means
574 * we handle packets for a removed sender longer than
575 * strictly needed, but this doesn't hurt */
576 handled = TRUE;
577 break;
578 }
579 }
580 }
581 }
582 else
583 {
584 DEBUG ("Received unhandled packet type!!, ignoring");
585 }
586 }
587
588 return handled;
589 }
590
591 static void schedule_repair (GibberRMulticastSender *sender, guint32 id);
592 static void schedule_do_repair (GibberRMulticastSender *sender, guint32 id);
593 static void schedule_whois_request (GibberRMulticastSender *sender,
594 gboolean rescheduled);
595 static gboolean name_discovery_failed_cb (gpointer data);
596 static void schedule_progress_timer (GibberRMulticastSender *self);
597
598 G_DEFINE_TYPE(GibberRMulticastSender, gibber_r_multicast_sender, G_TYPE_OBJECT)
599
600 /* signal enum */
601 enum
602 {
603 REPAIR_REQUEST,
604 REPAIR_MESSAGE,
605 WHOIS_REPLY,
606 WHOIS_REQUEST,
607 NAME_DISCOVERED,
608 RECEIVED_DATA,
609 RECEIVED_CONTROL_PACKET,
610 FAILED,
611 LAST_SIGNAL
612 };
613
614 /* properties */
615 enum {
616 PROP_SENDER_GROUP = 1,
617 LAST_PROPERTY
618 };
619
620 static guint signals[LAST_SIGNAL] = {0};
621
622 typedef struct {
623 guint32 packet_id;
624 guint timeout;
625 gboolean repeating;
626 GibberRMulticastPacket *packet;
627 GibberRMulticastSender *sender;
628 gboolean acked;
629 gboolean popped;
630 } PacketInfo;
631
632 static void
packet_info_free(gpointer data)633 packet_info_free (gpointer data)
634 {
635 PacketInfo *p = (PacketInfo *) data;
636 if (p->packet != NULL) {
637 g_object_unref (p->packet);
638 }
639
640 if (p->timeout != 0) {
641 g_source_remove (p->timeout);
642 }
643 g_slice_free (PacketInfo, data);
644 }
645
646 static PacketInfo *
packet_info_new(GibberRMulticastSender * sender,guint32 packet_id)647 packet_info_new (GibberRMulticastSender*sender, guint32 packet_id)
648 {
649 PacketInfo *result;
650 result = g_slice_new0 (PacketInfo);
651 result->packet_id = packet_id;
652 result->sender = sender;
653 return result;
654 }
655
656 static void
gibber_r_multicast_sender_init(GibberRMulticastSender * obj)657 gibber_r_multicast_sender_init (GibberRMulticastSender *obj)
658 {
659 GibberRMulticastSenderPrivate *priv =
660 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (obj);
661
662 /* allocate any data required by the object here */
663 priv->packet_cache = g_hash_table_new_full (g_int_hash, g_int_equal,
664 NULL, packet_info_free);
665
666 priv->acks = g_hash_table_new_full (g_int_hash, g_int_equal,
667 NULL, ack_info_free);
668 }
669
670 static void gibber_r_multicast_sender_dispose (GObject *object);
671 static void gibber_r_multicast_sender_finalize (GObject *object);
672
673 static void
gibber_r_multicast_sender_set_property(GObject * object,guint property_id,const GValue * value,GParamSpec * pspec)674 gibber_r_multicast_sender_set_property (GObject *object,
675 guint property_id, const GValue *value, GParamSpec *pspec)
676 {
677 GibberRMulticastSender *sender = GIBBER_R_MULTICAST_SENDER (object);
678 GibberRMulticastSenderPrivate *priv =
679 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
680
681 switch (property_id) {
682 case PROP_SENDER_GROUP:
683 priv->group =
684 (GibberRMulticastSenderGroup *) g_value_get_pointer (value);
685 break;
686 default:
687 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
688 break;
689 }
690 }
691
692 static void
gibber_r_multicast_sender_class_init(GibberRMulticastSenderClass * gibber_r_multicast_sender_class)693 gibber_r_multicast_sender_class_init (
694 GibberRMulticastSenderClass *gibber_r_multicast_sender_class)
695 {
696 GObjectClass *object_class =
697 G_OBJECT_CLASS (gibber_r_multicast_sender_class);
698 GParamSpec *param_spec;
699
700 g_type_class_add_private (gibber_r_multicast_sender_class,
701 sizeof (GibberRMulticastSenderPrivate));
702
703 object_class->dispose = gibber_r_multicast_sender_dispose;
704 object_class->finalize = gibber_r_multicast_sender_finalize;
705
706 object_class->set_property = gibber_r_multicast_sender_set_property;
707
708 signals[REPAIR_REQUEST] = g_signal_new ("repair-request",
709 G_OBJECT_CLASS_TYPE(gibber_r_multicast_sender_class),
710 G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
711 0,
712 NULL, NULL,
713 g_cclosure_marshal_VOID__UINT,
714 G_TYPE_NONE, 1, G_TYPE_UINT);
715
716 signals[REPAIR_MESSAGE] = g_signal_new ("repair-message",
717 G_OBJECT_CLASS_TYPE(gibber_r_multicast_sender_class),
718 G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
719 0,
720 NULL, NULL,
721 g_cclosure_marshal_VOID__OBJECT,
722 G_TYPE_NONE, 1, GIBBER_TYPE_R_MULTICAST_PACKET);
723
724 signals[RECEIVED_DATA] = g_signal_new ("received-data",
725 G_OBJECT_CLASS_TYPE(gibber_r_multicast_sender_class),
726 G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
727 0,
728 NULL, NULL,
729 _gibber_signals_marshal_VOID__UINT_POINTER_ULONG,
730 G_TYPE_NONE, 3, G_TYPE_UINT, G_TYPE_POINTER, G_TYPE_ULONG);
731
732 signals[RECEIVED_CONTROL_PACKET] = g_signal_new ("received-control-packet",
733 G_OBJECT_CLASS_TYPE(gibber_r_multicast_sender_class),
734 G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
735 0,
736 NULL, NULL,
737 g_cclosure_marshal_VOID__OBJECT,
738 G_TYPE_NONE, 1, GIBBER_TYPE_R_MULTICAST_PACKET);
739
740 signals[WHOIS_REPLY] = g_signal_new ("whois-reply",
741 G_OBJECT_CLASS_TYPE(gibber_r_multicast_sender_class),
742 G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
743 0,
744 NULL, NULL,
745 g_cclosure_marshal_VOID__VOID,
746 G_TYPE_NONE, 0);
747
748 signals[WHOIS_REQUEST] = g_signal_new ("whois-request",
749 G_OBJECT_CLASS_TYPE(gibber_r_multicast_sender_class),
750 G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
751 0,
752 NULL, NULL,
753 g_cclosure_marshal_VOID__VOID,
754 G_TYPE_NONE, 0);
755
756 signals[FAILED] = g_signal_new ("failed",
757 G_OBJECT_CLASS_TYPE(gibber_r_multicast_sender_class),
758 G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
759 0,
760 NULL, NULL,
761 g_cclosure_marshal_VOID__VOID,
762 G_TYPE_NONE, 0);
763
764 signals[NAME_DISCOVERED] = g_signal_new ("name-discovered",
765 G_OBJECT_CLASS_TYPE(gibber_r_multicast_sender_class),
766 G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
767 0,
768 NULL, NULL,
769 g_cclosure_marshal_VOID__STRING,
770 G_TYPE_NONE, 1, G_TYPE_STRING);
771
772 param_spec = g_param_spec_pointer ("sendergroup",
773 "Sender Group",
774 "Group of senders",
775 G_PARAM_CONSTRUCT_ONLY |
776 G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS);
777
778 g_object_class_install_property (object_class, PROP_SENDER_GROUP,
779 param_spec);
780 }
781
782 void
gibber_r_multicast_sender_dispose(GObject * object)783 gibber_r_multicast_sender_dispose (GObject *object)
784 {
785 GibberRMulticastSender *self = GIBBER_R_MULTICAST_SENDER (object);
786 GibberRMulticastSenderPrivate *priv =
787 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (self);
788
789 DEBUG_SENDER (self, "disposing");
790
791 if (priv->dispose_has_run)
792 return;
793
794 priv->dispose_has_run = TRUE;
795
796 g_hash_table_unref (priv->packet_cache);
797 g_hash_table_unref (priv->acks);
798
799 if (priv->whois_timer != 0)
800 {
801 g_source_remove (priv->whois_timer);
802 priv->whois_timer = 0;
803 }
804
805 if (priv->fail_timer != 0)
806 {
807 g_source_remove (priv->fail_timer);
808 priv->fail_timer = 0;
809 }
810
811 if (G_OBJECT_CLASS (gibber_r_multicast_sender_parent_class)->dispose)
812 G_OBJECT_CLASS (gibber_r_multicast_sender_parent_class)->dispose (object);
813 }
814
815 void
gibber_r_multicast_sender_finalize(GObject * object)816 gibber_r_multicast_sender_finalize (GObject *object)
817 {
818 GibberRMulticastSender *self = GIBBER_R_MULTICAST_SENDER (object);
819 /*GibberRMulticastSenderPrivate *priv =
820 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (self);
821 */
822
823 /* free any data held directly by the object here */
824 g_free (self->name);
825
826 G_OBJECT_CLASS (gibber_r_multicast_sender_parent_class)->finalize (object);
827 }
828
829 static void
set_state(GibberRMulticastSender * sender,GibberRMulticastSenderState state)830 set_state (GibberRMulticastSender *sender,
831 GibberRMulticastSenderState state)
832 {
833 g_assert (sender->state <= state);
834
835 sender->state = state;
836 }
837
838 GibberRMulticastSender *
gibber_r_multicast_sender_new(guint32 id,const gchar * name,GibberRMulticastSenderGroup * group)839 gibber_r_multicast_sender_new (guint32 id, const gchar *name,
840 GibberRMulticastSenderGroup *group)
841 {
842 GibberRMulticastSender *sender;
843 GibberRMulticastSenderPrivate *priv;
844
845 sender = g_object_new (GIBBER_TYPE_R_MULTICAST_SENDER, "sendergroup", group,
846 NULL);
847 priv = GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
848
849 g_assert (group != NULL);
850
851 sender->id = id;
852 sender->name = g_strdup (name);
853
854 if (sender->name == NULL)
855 {
856 schedule_whois_request (sender, FALSE);
857 priv->fail_timer = g_timeout_add (NAME_DISCOVERY_TIME,
858 name_discovery_failed_cb, sender);
859 }
860 else
861 {
862 schedule_progress_timer (sender);
863 }
864
865 return sender;
866 }
867
868 static void
packet_info_try_gc(GibberRMulticastSender * sender,PacketInfo * info)869 packet_info_try_gc (GibberRMulticastSender *sender, PacketInfo *info)
870 {
871 GibberRMulticastSenderPrivate *priv =
872 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
873 guint32 packet_id, i;
874
875 if (!info->acked || !info->popped || info->repeating)
876 return;
877
878 packet_id = info->packet_id;
879 g_hash_table_remove (priv->packet_cache, &packet_id);
880
881 if (packet_id == priv->first_packet)
882 {
883 for (i = packet_id; i != sender->next_output_data_packet; i++)
884 if (g_hash_table_lookup (priv->packet_cache, &i) != NULL)
885 break;
886
887 priv->first_packet = i;
888 }
889 }
890
891 static void
cancel_failure_timers(GibberRMulticastSender * sender)892 cancel_failure_timers (GibberRMulticastSender *sender)
893 {
894 GibberRMulticastSenderPrivate *priv =
895 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
896
897 /* Cancel timers that are not needed anymore now the sender has failed */
898 if (priv->fail_timer != 0)
899 {
900 g_source_remove (priv->fail_timer);
901 priv->fail_timer = 0;
902 }
903
904 /* failed, no need to get our name anymore */
905 if (priv->whois_timer != 0)
906 {
907 g_source_remove (priv->whois_timer);
908 priv->whois_timer = 0;
909 }
910 }
911
912 static void
signal_data(GibberRMulticastSender * sender,guint16 stream_id,guint8 * data,gsize size)913 signal_data (GibberRMulticastSender *sender, guint16 stream_id,
914 guint8 *data, gsize size)
915 {
916 set_state (sender,
917 MAX(GIBBER_R_MULTICAST_SENDER_STATE_DATA_RUNNING, sender->state));
918
919 g_signal_emit (sender, signals[RECEIVED_DATA], 0, stream_id, data, size);
920 }
921
922 static void
signal_control_packet(GibberRMulticastSender * sender,GibberRMulticastPacket * packet)923 signal_control_packet (GibberRMulticastSender *sender,
924 GibberRMulticastPacket *packet)
925 {
926 set_state (sender,
927 MAX (GIBBER_R_MULTICAST_SENDER_STATE_RUNNING, sender->state));
928
929 g_signal_emit (sender, signals[RECEIVED_CONTROL_PACKET], 0, packet);
930 }
931
932 static void
signal_failure(GibberRMulticastSender * sender)933 signal_failure (GibberRMulticastSender *sender)
934 {
935 if (sender->state >= GIBBER_R_MULTICAST_SENDER_STATE_FAILED)
936 return;
937
938 DEBUG_SENDER (sender, "Signalling senders failure");
939 cancel_failure_timers (sender);
940 g_signal_emit (sender, signals[FAILED], 0);
941 }
942
943 static gboolean
name_discovery_failed_cb(gpointer data)944 name_discovery_failed_cb (gpointer data)
945 {
946 GibberRMulticastSender *self = GIBBER_R_MULTICAST_SENDER (data);
947 GibberRMulticastSenderPrivate *priv =
948 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (self);
949
950 DEBUG_SENDER (self, "Failed to discover name in time");
951
952 g_assert (priv->whois_timer != 0);
953
954 g_source_remove (priv->whois_timer);
955 priv->whois_timer = 0;
956 priv->fail_timer = 0;
957
958 signal_failure (self);
959
960 return FALSE;
961 }
962
963 static gboolean
progress_failed_cb(gpointer data)964 progress_failed_cb (gpointer data)
965 {
966 GibberRMulticastSender *self = GIBBER_R_MULTICAST_SENDER (data);
967 GibberRMulticastSenderPrivate *priv =
968 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (self);
969
970 DEBUG_SENDER (self, "Failed to make progress in time");
971
972 priv->fail_timer = 0;
973
974 signal_failure (self);
975
976 return FALSE;
977 }
978
979 static void
schedule_progress_timer(GibberRMulticastSender * self)980 schedule_progress_timer (GibberRMulticastSender *self)
981 {
982 GibberRMulticastSenderPrivate *priv =
983 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (self);
984
985 /* If we didn't discover the name, that timer is still running */
986 if (self->name == NULL)
987 return;
988
989 /* No need for a watchdog if it has failed already */
990 if (self->state >= GIBBER_R_MULTICAST_SENDER_STATE_FAILED)
991 return;
992
993 if (priv->fail_timer != 0)
994 g_source_remove (priv->fail_timer);
995
996 priv->fail_timer = g_timeout_add (MAX_PROGRESS_TIMEOUT,
997 progress_failed_cb, self);
998 }
999
1000 static void
stop_whois_discovery(GibberRMulticastSender * self)1001 stop_whois_discovery (GibberRMulticastSender *self)
1002 {
1003 GibberRMulticastSenderPrivate *priv =
1004 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (self);
1005
1006 if (priv->whois_timer != 0)
1007 {
1008 g_source_remove (priv->whois_timer);
1009 priv->whois_timer = 0;
1010 }
1011
1012 if (priv->fail_timer != 0)
1013 {
1014 g_source_remove (priv->fail_timer);
1015 priv->fail_timer = 0;
1016 }
1017
1018 }
1019
1020 static void
name_discovered(GibberRMulticastSender * self,const gchar * name)1021 name_discovered (GibberRMulticastSender *self, const gchar *name)
1022 {
1023 stop_whois_discovery (self);
1024
1025 self->name = g_strdup (name);
1026 DEBUG_SENDER (self, "Name discovered");
1027 g_signal_emit (self, signals[NAME_DISCOVERED], 0, self->name);
1028
1029 schedule_progress_timer (self);
1030 }
1031
1032 static gboolean
request_repair(gpointer data)1033 request_repair (gpointer data)
1034 {
1035 PacketInfo *info = (PacketInfo *) data;
1036
1037 DEBUG_SENDER (info->sender, "Sending out repair request for 0x%x",
1038 info->packet_id);
1039
1040 info->timeout = 0;
1041 g_signal_emit (info->sender, signals[REPAIR_REQUEST], 0, info->packet_id);
1042 schedule_repair (info->sender, info->packet_id);
1043
1044 return FALSE;
1045 }
1046
1047
1048 static void
schedule_repair(GibberRMulticastSender * sender,guint32 id)1049 schedule_repair (GibberRMulticastSender *sender, guint32 id)
1050 {
1051 GibberRMulticastSenderPrivate *priv =
1052 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1053 PacketInfo *info;
1054 guint timeout;
1055
1056 if (sender->state > GIBBER_R_MULTICAST_SENDER_STATE_STOPPED)
1057 return;
1058
1059 info = g_hash_table_lookup (priv->packet_cache, &id);
1060
1061 if (info != NULL && (info->packet != NULL || info->timeout != 0)) {
1062 return;
1063 }
1064
1065 if (info == NULL)
1066 {
1067 info = packet_info_new (sender, id);
1068 g_hash_table_insert (priv->packet_cache, &info->packet_id, info);
1069 timeout = g_random_int_range (MIN_INITIAL_REPAIR_TIMEOUT,
1070 MAX_INITIAL_REPAIR_TIMEOUT);
1071 }
1072 else
1073 {
1074 timeout = g_random_int_range (MIN_REPAIR_TIMEOUT, MAX_REPAIR_TIMEOUT);
1075 }
1076
1077 info->timeout = g_timeout_add (timeout, request_repair, info);
1078 DEBUG_SENDER (sender,
1079 "Scheduled repair request for 0x%x in %d ms", id, timeout);
1080 }
1081
1082 static gboolean
do_repair(gpointer data)1083 do_repair (gpointer data)
1084 {
1085 PacketInfo *info = (PacketInfo *) data;
1086
1087 g_assert (info != NULL && info->packet != NULL);
1088
1089 DEBUG_SENDER (info->sender, "Sending Repair message for 0x%x",
1090 info->packet_id);
1091
1092 info->timeout = 0;
1093 g_signal_emit (info->sender, signals[REPAIR_MESSAGE], 0, info->packet);
1094
1095 if (info->repeating)
1096 {
1097 schedule_do_repair (info->sender, info->packet_id);
1098 }
1099
1100 return FALSE;
1101 }
1102
1103 static void
schedule_do_repair(GibberRMulticastSender * sender,guint32 id)1104 schedule_do_repair (GibberRMulticastSender *sender, guint32 id)
1105 {
1106 GibberRMulticastSenderPrivate *priv =
1107 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1108 PacketInfo *info;
1109 guint timeout;
1110
1111 info = g_hash_table_lookup (priv->packet_cache, &id);
1112
1113 g_assert (info != NULL && info->packet != NULL);
1114 if (info->timeout != 0)
1115 {
1116 /* Repair already scheduled, ignore */
1117 return;
1118 }
1119
1120 timeout = g_random_int_range (MIN_DO_REPAIR_TIMEOUT, MAX_DO_REPAIR_TIMEOUT);
1121 info->timeout = g_timeout_add (timeout, do_repair, info);
1122 DEBUG_SENDER (sender, "Scheduled repair for 0x%x in %d ms", id, timeout);
1123 }
1124
1125 static gboolean
do_whois_reply(gpointer data)1126 do_whois_reply (gpointer data)
1127 {
1128 GibberRMulticastSender *sender = GIBBER_R_MULTICAST_SENDER (data);
1129 GibberRMulticastSenderPrivate *priv =
1130 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1131
1132 DEBUG_SENDER (sender, "Sending out whois reply");
1133 g_signal_emit (sender, signals[WHOIS_REPLY], 0);
1134 priv->whois_timer = 0;
1135
1136 return FALSE;
1137 }
1138
1139 static gboolean
do_whois_request(gpointer data)1140 do_whois_request (gpointer data)
1141 {
1142 GibberRMulticastSender *sender = GIBBER_R_MULTICAST_SENDER (data);
1143
1144 schedule_whois_request (sender, TRUE);
1145
1146 DEBUG_SENDER (sender, "Sending out whois request");
1147 g_signal_emit (sender, signals[WHOIS_REQUEST], 0);
1148
1149 return FALSE;
1150 }
1151
1152 static void
schedule_whois_request(GibberRMulticastSender * sender,gboolean rescheduled)1153 schedule_whois_request (GibberRMulticastSender *sender, gboolean rescheduled)
1154 {
1155 GibberRMulticastSenderPrivate *priv =
1156 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1157 gint timeout;
1158
1159 if (sender->state >= GIBBER_R_MULTICAST_SENDER_STATE_FAILED)
1160 return;
1161
1162 if (rescheduled)
1163 timeout = g_random_int_range (MIN_WHOIS_TIMEOUT, MAX_WHOIS_TIMEOUT);
1164 else
1165 timeout = g_random_int_range (MIN_FIRST_WHOIS_TIMEOUT,
1166 MAX_FIRST_WHOIS_TIMEOUT);
1167
1168 DEBUG_SENDER (sender, "(Re)Scheduled whois request in %d ms", timeout);
1169
1170 if (priv->whois_timer != 0)
1171 g_source_remove (priv->whois_timer);
1172
1173 priv->whois_timer = g_timeout_add (timeout, do_whois_request, sender);
1174 }
1175
1176 static gboolean
check_depends(GibberRMulticastSender * sender,GibberRMulticastPacket * packet,gboolean data)1177 check_depends (GibberRMulticastSender *sender, GibberRMulticastPacket *packet,
1178 gboolean data)
1179 {
1180 guint i;
1181 GibberRMulticastSenderPrivate *priv =
1182 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1183
1184 g_assert (GIBBER_R_MULTICAST_PACKET_IS_RELIABLE_PACKET (packet));
1185
1186 for (i = 0; i < packet->depends->len; i++)
1187 {
1188 GibberRMulticastSender *s;
1189 GibberRMulticastPacketSenderInfo *sender_info;
1190 guint32 other;
1191
1192 sender_info = g_array_index (packet->depends,
1193 GibberRMulticastPacketSenderInfo *, i);
1194
1195 s = gibber_r_multicast_sender_group_lookup (priv->group,
1196 sender_info->sender_id);
1197
1198 if (s == NULL
1199 || s->state == GIBBER_R_MULTICAST_SENDER_STATE_NEW
1200 || s->state == GIBBER_R_MULTICAST_SENDER_STATE_UNKNOWN_FAILED)
1201 {
1202 DEBUG_SENDER (sender,
1203 "Unknown node in dependency list of packet %x: %x",
1204 sender_info->sender_id, packet->packet_id);
1205 continue;
1206 }
1207
1208 if (data)
1209 other = s->next_output_data_packet;
1210 else
1211 other = s->next_output_packet;
1212
1213 if (gibber_r_multicast_packet_diff (sender_info->packet_id, other) < 0)
1214 {
1215 DEBUG_SENDER (sender,
1216 "Waiting node %x to complete it's messages up to %x",
1217 sender_info->sender_id, sender_info->packet_id);
1218 if (s->state == GIBBER_R_MULTICAST_SENDER_STATE_FAILED)
1219 {
1220 DEBUG_SENDER (sender,
1221 "Asking failed node %x to complete it's messages up to %x",
1222 sender_info->sender_id, sender_info->packet_id);
1223 gibber_r_multicast_sender_update_end (s, sender_info->packet_id);
1224 }
1225 return FALSE;
1226 }
1227 }
1228
1229 return TRUE;
1230 }
1231
1232 static void
update_next_data_output_state(GibberRMulticastSender * self)1233 update_next_data_output_state (GibberRMulticastSender *self)
1234 {
1235 GibberRMulticastSenderPrivate *priv =
1236 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (self);
1237
1238 self->next_output_data_packet++;
1239
1240 for (; self->next_output_data_packet != self->next_output_packet;
1241 self->next_output_data_packet++)
1242 {
1243 PacketInfo *p;
1244 p = g_hash_table_lookup (priv->packet_cache,
1245 &(self->next_output_data_packet));
1246
1247 if (p == NULL)
1248 continue;
1249
1250 if (p->packet->type == PACKET_TYPE_DATA
1251 && (p->packet->data.data.flags & GIBBER_R_MULTICAST_DATA_PACKET_END))
1252 {
1253 break;
1254 }
1255 }
1256 }
1257
1258 static gboolean
pop_data_packet(GibberRMulticastSender * sender)1259 pop_data_packet (GibberRMulticastSender *sender)
1260 {
1261 GibberRMulticastSenderPrivate *priv =
1262 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1263 PacketInfo *p;
1264 guint16 stream_id;
1265
1266 /* If we're holding before this, skip */
1267 if (priv->holding_data &&
1268 gibber_r_multicast_packet_diff (sender->next_output_data_packet,
1269 priv->holding_point)
1270 <= 0)
1271 {
1272 DEBUG_SENDER (sender, "Holding back data finishing at %x",
1273 sender->next_output_data_packet);
1274 return FALSE;
1275 }
1276
1277 DEBUG_SENDER (sender, "Trying to pop data finishing at %x",
1278 sender->next_output_data_packet);
1279
1280 p = g_hash_table_lookup (priv->packet_cache,
1281 &sender->next_output_data_packet);
1282 g_assert (p != NULL);
1283
1284 g_assert (p->packet->data.data.flags & GIBBER_R_MULTICAST_DATA_PACKET_END);
1285
1286 stream_id = p->packet->data.data.stream_id;
1287
1288 /* Backwards search for the start, validate the pieces and check the size */
1289 if (!(p->packet->data.data.flags & GIBBER_R_MULTICAST_DATA_PACKET_START))
1290 {
1291 guint32 i;
1292 gboolean found = FALSE;
1293
1294 for (i = p->packet->packet_id - 1;
1295 gibber_r_multicast_packet_diff (priv->first_packet, i) >= 0; i--)
1296 {
1297 p = g_hash_table_lookup (priv->packet_cache, &i);
1298 if (p == NULL)
1299 continue;
1300
1301 if (p->packet->type == PACKET_TYPE_DATA
1302 && p->packet->data.data.stream_id == stream_id
1303 && (p->packet->data.data.flags &
1304 GIBBER_R_MULTICAST_DATA_PACKET_START))
1305 {
1306 found = TRUE;
1307 break;
1308 }
1309 }
1310
1311 if (!found)
1312 {
1313 /* If we couldn't find the start it must have happened before we
1314 * joined the causal ordering */
1315 DEBUG_SENDER (sender,
1316 "Ignoring data starting before our first packet");
1317 update_next_data_output_state (sender);
1318 return TRUE;
1319 }
1320 }
1321
1322 /* p is guaranteed to be the PacketInfo of the first packet */
1323
1324 /* If there is data from before our startpoint, ignore it */
1325 if (sender->state != GIBBER_R_MULTICAST_SENDER_STATE_DATA_RUNNING
1326 && !priv->start_data)
1327 {
1328 DEBUG_SENDER (sender,
1329 "Ignoring data as we don't have a data startpoint yet");
1330 update_next_data_output_state (sender);
1331 return TRUE;
1332 }
1333
1334 if (priv->start_data &&
1335 gibber_r_multicast_packet_diff (priv->start_point, p->packet_id) < 0)
1336 {
1337 DEBUG_SENDER (sender,
1338 "Ignoring data from before the data startpoint");
1339 update_next_data_output_state (sender);
1340 return TRUE;
1341 }
1342
1343
1344 if (!check_depends (sender, p->packet, TRUE))
1345 {
1346 return FALSE;
1347 }
1348
1349 /* Everything is fine, now do a forward pass to gather all the payload, we
1350 * could have cached this info, but oh well */
1351 DEBUG_SENDER (sender, "Popping data 0x%x -> 0x%x stream_id: %x",
1352 p->packet_id, sender->next_output_data_packet,
1353 p->packet->data.data.stream_id);
1354
1355 if (p->packet->packet_id == sender->next_output_data_packet)
1356 {
1357 gsize size;
1358 guint8 *data;
1359
1360 data = gibber_r_multicast_packet_get_payload (p->packet, &size);
1361
1362 if (size != p->packet->data.data.total_size)
1363 goto incorrect_data_size;
1364
1365 update_next_data_output_state (sender);
1366 signal_data (sender, p->packet->data.data.stream_id, data, size);
1367
1368 p->popped = TRUE;
1369 packet_info_try_gc (sender, p);
1370 }
1371 else
1372 {
1373 gsize off = 0;
1374 guint8 *data = NULL, *d;
1375 guint32 payload_size, i;
1376 gsize size;
1377
1378 payload_size = p->packet->data.data.total_size;
1379 data = g_malloc (payload_size);
1380
1381 for (i = p->packet_id ; i != sender->next_output_data_packet + 1 ; i++)
1382 {
1383 PacketInfo *tp = g_hash_table_lookup (priv->packet_cache, &i);
1384
1385 if (tp == NULL)
1386 continue;
1387
1388 if (tp->packet->type == PACKET_TYPE_DATA
1389 && tp->packet->data.data.stream_id == stream_id)
1390 {
1391 d = gibber_r_multicast_packet_get_payload (tp->packet, &size);
1392 if (off + size > payload_size)
1393 {
1394 off += size;
1395 break;
1396 }
1397
1398 memcpy (data + off, d, size);
1399 off += size;
1400
1401 tp->popped = TRUE;
1402 packet_info_try_gc (sender, tp);
1403 }
1404 }
1405
1406 if (off != payload_size)
1407 {
1408 g_free (data);
1409 goto incorrect_data_size;
1410 }
1411
1412 update_next_data_output_state (sender);
1413 signal_data (sender, stream_id, data, payload_size);
1414 g_free (data);
1415 }
1416
1417 return TRUE;
1418
1419 incorrect_data_size:
1420
1421 DEBUG_SENDER (sender, "Data packet didn't have the claimed amount of data");
1422 signal_failure (sender);
1423 return FALSE;
1424 }
1425
1426 static void
update_acks(GibberRMulticastSender * sender,GibberRMulticastPacket * packet)1427 update_acks (GibberRMulticastSender *sender, GibberRMulticastPacket *packet)
1428 {
1429 GibberRMulticastSenderPrivate *priv =
1430 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1431
1432 guint i;
1433 gboolean updated = FALSE;
1434
1435 for (i = 0; i < packet->depends->len; i++)
1436 {
1437 GibberRMulticastPacketSenderInfo *senderinfo;
1438 AckInfo *info;
1439
1440 senderinfo = g_array_index (packet->depends,
1441 GibberRMulticastPacketSenderInfo *, i);
1442
1443 info = (AckInfo *) g_hash_table_lookup (priv->acks,
1444 &senderinfo->sender_id);
1445
1446 if (G_UNLIKELY(info == NULL))
1447 {
1448 info = ack_info_new (senderinfo->sender_id);
1449 g_hash_table_insert (priv->acks, &info->sender_id, info);
1450 info->packet_id = senderinfo->packet_id;
1451 info->first_packet_id = packet->packet_id;
1452 updated = TRUE;
1453 }
1454
1455 if (gibber_r_multicast_packet_diff (info->packet_id,
1456 senderinfo->packet_id) < 0)
1457 {
1458 DEBUG_SENDER (sender, "Acks are going backward!");
1459 signal_failure (sender);
1460 return;
1461 }
1462
1463 if (gibber_r_multicast_packet_diff (info->packet_id,
1464 senderinfo->packet_id) > 0)
1465 {
1466 info->packet_id = senderinfo->packet_id;
1467 info->first_packet_id = packet->packet_id;
1468 updated = TRUE;
1469 }
1470 }
1471
1472 if (updated)
1473 {
1474 gibber_r_multicast_sender_group_gc (priv->group);
1475 }
1476 }
1477
1478 static gboolean
pop_packet(GibberRMulticastSender * sender)1479 pop_packet (GibberRMulticastSender *sender)
1480 {
1481 GibberRMulticastSenderPrivate *priv =
1482 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1483 PacketInfo *p;
1484
1485 DEBUG_SENDER (sender, "Next output: 0x%x Next output data: 0x%x",
1486 sender->next_output_packet, sender->next_output_data_packet);
1487
1488 if (sender->next_output_data_packet != sender->next_output_packet)
1489 {
1490 /* We saw the end of some data message before the end of the data stream,
1491 * first try if we can pop this */
1492 if (pop_data_packet (sender))
1493 return TRUE;
1494 }
1495
1496 if (sender->state == GIBBER_R_MULTICAST_SENDER_STATE_FAILED
1497 && gibber_r_multicast_packet_diff (priv->end_point,
1498 sender->next_output_packet) >= 0)
1499 {
1500 DEBUG_SENDER (sender, "Not looking at packets behind the endpoint");
1501 return FALSE;
1502 }
1503
1504 p = g_hash_table_lookup (priv->packet_cache, &(sender->next_output_packet));
1505
1506 DEBUG_SENDER (sender, "Looking at 0x%x", sender->next_output_packet);
1507
1508 if (p == NULL || p->packet == NULL)
1509 {
1510 /* No packet yet.. too bad :( */
1511 DEBUG_SENDER(sender, "No new packets to pop");
1512 return FALSE;
1513 }
1514
1515 g_assert (GIBBER_R_MULTICAST_PACKET_IS_RELIABLE_PACKET (p->packet));
1516
1517 update_acks (sender, p->packet);
1518
1519 if (!check_depends (sender, p->packet, FALSE))
1520 {
1521 return FALSE;
1522 }
1523
1524 if (p->packet->type == PACKET_TYPE_DATA)
1525 {
1526 /* A data packet. If we had a potential end before this one, skip it
1527 * we're holding back the data for some reason otherwise check
1528 * if it's an end */
1529
1530 if (sender->next_output_data_packet == sender->next_output_packet)
1531 {
1532 sender->next_output_packet++;
1533 /* If this is the end, try to pop it. Otherwise ignore */
1534 if (p->packet->data.data.flags & GIBBER_R_MULTICAST_DATA_PACKET_END)
1535 {
1536 /* If we could pop this, then advance next_output_data_packet
1537 * otherwise keep it at this location */
1538 pop_data_packet (sender);
1539 }
1540 else
1541 {
1542 sender->next_output_data_packet++;
1543 }
1544 }
1545 else
1546 {
1547 sender->next_output_packet++;
1548 }
1549 }
1550 else
1551 {
1552 if (sender->next_output_packet == sender->next_output_data_packet)
1553 sender->next_output_data_packet++;
1554
1555 sender->next_output_packet++;
1556
1557 if (p->packet->type != PACKET_TYPE_NO_DATA)
1558 {
1559 signal_control_packet (sender, p->packet);
1560 }
1561
1562 p->popped = TRUE;
1563 packet_info_try_gc (sender, p);
1564 }
1565
1566 /* We successfully popped a new packet (weee), reschedule our watch dog */
1567 schedule_progress_timer (sender);
1568
1569 return TRUE;
1570 }
1571
1572 static gboolean
do_pop_packets(GibberRMulticastSender * sender)1573 do_pop_packets (GibberRMulticastSender *sender)
1574 {
1575 gboolean popped = FALSE;
1576 GibberRMulticastSenderPrivate *priv =
1577 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1578
1579 if (sender->state < GIBBER_R_MULTICAST_SENDER_STATE_PREPARING
1580 || sender->state > GIBBER_R_MULTICAST_SENDER_STATE_FAILED)
1581 {
1582 /* No popping untill we have at least some information */
1583 return FALSE;
1584 }
1585
1586 /* Don't pop if our sender group was stopped */
1587 if (priv->group->stopped)
1588 return FALSE;
1589
1590 g_object_ref (sender);
1591
1592 while (sender->state <= GIBBER_R_MULTICAST_SENDER_STATE_FAILED)
1593 {
1594 if (!pop_packet (sender))
1595 break;
1596
1597 popped = TRUE;
1598 }
1599
1600 g_object_unref (sender);
1601
1602 return popped;
1603 }
1604
1605 static void
senders_collect(gpointer key,gpointer value,gpointer user_data)1606 senders_collect (gpointer key, gpointer value, gpointer user_data)
1607 {
1608 GibberRMulticastSender *s = GIBBER_R_MULTICAST_SENDER(value);
1609 GibberRMulticastSender *sender = GIBBER_R_MULTICAST_SENDER (user_data);
1610 GibberRMulticastSenderPrivate *priv =
1611 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1612
1613 if (s->state < GIBBER_R_MULTICAST_SENDER_STATE_PENDING_REMOVAL)
1614 g_queue_push_tail (priv->group->pop_queue, s);
1615 }
1616
1617
1618 static void
pop_packets(GibberRMulticastSender * sender)1619 pop_packets (GibberRMulticastSender *sender)
1620 {
1621 GibberRMulticastSenderPrivate *priv =
1622 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1623 gboolean pop;
1624
1625
1626 if (priv->group->popping)
1627 {
1628 if (!g_queue_find (priv->group->pop_queue, sender))
1629 {
1630 /* Ensure that data is popped at the next opportunity */
1631 g_queue_push_tail (priv->group->pop_queue, sender);
1632 }
1633 return;
1634 }
1635
1636
1637 priv->group->popping = TRUE;
1638
1639 g_object_ref (sender);
1640
1641 pop = do_pop_packets (sender);
1642
1643 /* If something is popped or a node queued itself for popping, go for it */
1644 while (pop || g_queue_peek_head (priv->group->pop_queue) != NULL)
1645 {
1646 GibberRMulticastSender *s;
1647
1648 /* If something was popped, try to pop as much as possible from others in
1649 * this group. Else just pop all senders in the queue */
1650 if (pop)
1651 {
1652 while (g_queue_pop_head (priv->group->pop_queue) != NULL)
1653 /* pass */;
1654
1655 g_hash_table_foreach (priv->group->senders, senders_collect, sender);
1656 }
1657
1658 pop = FALSE;
1659 while ((s = g_queue_pop_head (priv->group->pop_queue)) != NULL)
1660 {
1661 pop |= do_pop_packets (s);
1662 }
1663 }
1664
1665 priv->group->popping = FALSE;
1666 g_object_unref (sender);
1667 }
1668
1669 static void
insert_packet(GibberRMulticastSender * sender,GibberRMulticastPacket * packet)1670 insert_packet (GibberRMulticastSender *sender, GibberRMulticastPacket *packet)
1671 {
1672 GibberRMulticastSenderPrivate *priv =
1673 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1674 PacketInfo *info;
1675
1676 g_assert (sender->state > GIBBER_R_MULTICAST_SENDER_STATE_NEW);
1677
1678 info = g_hash_table_lookup (priv->packet_cache, &packet->packet_id);
1679 if (info != NULL && info->packet != NULL)
1680 {
1681 /* Already seen this packet */
1682 DEBUG_SENDER (sender, "Detect resent of packet 0x%x", packet->packet_id);
1683 return;
1684 }
1685
1686 if (info == NULL)
1687 {
1688 info = packet_info_new (sender, packet->packet_id);
1689 g_hash_table_insert (priv->packet_cache, &info->packet_id, info);
1690 }
1691
1692 if (info->timeout != 0)
1693 {
1694 g_source_remove (info->timeout);
1695 info->timeout = 0;
1696 }
1697
1698 DEBUG_SENDER (sender, "Inserting packet 0x%x", packet->packet_id);
1699 info->packet = g_object_ref (packet);
1700
1701 if (gibber_r_multicast_packet_diff (sender->next_input_packet,
1702 packet->packet_id) >= 0)
1703 {
1704 /* Potentially needs some repairs */
1705 guint32 i;
1706 for (i = sender->next_input_packet; i != packet->packet_id; i++)
1707 {
1708 schedule_repair (sender, i);
1709 }
1710 sender->next_input_packet = packet->packet_id + 1;
1711 }
1712
1713 /* pop out as many packets as we can */
1714 pop_packets (sender);
1715
1716 return;
1717 }
1718
1719
1720 void
gibber_r_multicast_sender_update_start(GibberRMulticastSender * sender,guint32 packet_id)1721 gibber_r_multicast_sender_update_start (GibberRMulticastSender *sender,
1722 guint32 packet_id)
1723 {
1724 GibberRMulticastSenderPrivate *priv =
1725 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1726
1727 DEBUG_SENDER (sender, "Updating start to %x", packet_id);
1728 g_assert (sender->state < GIBBER_R_MULTICAST_SENDER_STATE_FAILED);
1729
1730 if (sender->state == GIBBER_R_MULTICAST_SENDER_STATE_NEW)
1731 {
1732 g_assert (g_hash_table_size (priv->packet_cache) == 0);
1733
1734 set_state (sender, GIBBER_R_MULTICAST_SENDER_STATE_PREPARING);
1735
1736 sender->next_input_packet = packet_id;
1737 sender->next_output_packet = packet_id;
1738 sender->next_output_data_packet = packet_id;
1739 priv->first_packet = packet_id;
1740 }
1741 else if (gibber_r_multicast_packet_diff (sender->next_input_packet,
1742 packet_id) > 0)
1743 {
1744 /* Remove all repair requests for packets up to this packet_id */
1745 guint32 i;
1746 for (i = priv->first_packet; i < packet_id; i++)
1747 {
1748 PacketInfo *info;
1749 info = g_hash_table_lookup (priv->packet_cache, &i);
1750 if (info != NULL && info->packet == NULL && info->timeout != 0)
1751 {
1752 g_source_remove (info->timeout);
1753 info->timeout = 0;
1754 }
1755 }
1756
1757 sender->next_input_packet = packet_id;
1758 sender->next_output_packet = packet_id;
1759 sender->next_output_data_packet = packet_id;
1760 }
1761 }
1762
1763 void
gibber_r_multicast_sender_update_end(GibberRMulticastSender * sender,guint32 packet_id)1764 gibber_r_multicast_sender_update_end (GibberRMulticastSender *sender,
1765 guint32 packet_id)
1766 {
1767 GibberRMulticastSenderPrivate *priv =
1768 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1769
1770 g_assert (sender->state == GIBBER_R_MULTICAST_SENDER_STATE_FAILED);
1771
1772 if (gibber_r_multicast_packet_diff (priv->end_point, packet_id) >= 0)
1773 {
1774 DEBUG_SENDER (sender, "Updating end to %x", packet_id);
1775 priv->end_point = packet_id;
1776 pop_packets (sender);
1777 }
1778 }
1779
1780 void
gibber_r_multicast_sender_set_failed(GibberRMulticastSender * sender)1781 gibber_r_multicast_sender_set_failed (GibberRMulticastSender *sender)
1782 {
1783 GibberRMulticastSenderPrivate *priv =
1784 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1785
1786 if (sender->state >= GIBBER_R_MULTICAST_SENDER_STATE_FAILED)
1787 return;
1788
1789 if (sender->state < GIBBER_R_MULTICAST_SENDER_STATE_PREPARING)
1790 {
1791 DEBUG_SENDER (sender, "Failed before we knew anything");
1792 set_state (sender, GIBBER_R_MULTICAST_SENDER_STATE_UNKNOWN_FAILED);
1793 }
1794 else
1795 {
1796 set_state (sender, GIBBER_R_MULTICAST_SENDER_STATE_FAILED);
1797 priv->end_point = sender->next_output_packet;
1798 DEBUG_SENDER (sender, "Marked sender as failed. Endpoint %x",
1799 priv->end_point);
1800 }
1801
1802 cancel_failure_timers (sender);
1803 }
1804
1805 void
gibber_r_multicast_sender_set_data_start(GibberRMulticastSender * sender,guint32 packet_id)1806 gibber_r_multicast_sender_set_data_start (GibberRMulticastSender *sender,
1807 guint32 packet_id)
1808 {
1809 GibberRMulticastSenderPrivate *priv =
1810 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1811
1812 g_assert (sender->state < GIBBER_R_MULTICAST_SENDER_STATE_DATA_RUNNING);
1813
1814 DEBUG_SENDER (sender, "Setting data start at 0x%x", packet_id);
1815
1816 priv->start_data = TRUE;
1817 priv->start_point = packet_id;
1818 }
1819
1820 void
gibber_r_multicast_sender_push(GibberRMulticastSender * sender,GibberRMulticastPacket * packet)1821 gibber_r_multicast_sender_push (GibberRMulticastSender *sender,
1822 GibberRMulticastPacket *packet)
1823 {
1824 GibberRMulticastSenderPrivate *priv =
1825 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1826 gint diff;
1827
1828 g_assert (sender->id == packet->sender);
1829
1830 if (sender->state < GIBBER_R_MULTICAST_SENDER_STATE_PREPARING)
1831 {
1832 /* Don't know where to start, so ignore..
1833 * A potential optimisation would be to cache a limited amount anyway, so
1834 * we don't have to repair them if we should have catched these anyways
1835 * */
1836 return;
1837 }
1838
1839 diff = gibber_r_multicast_packet_diff (sender->next_output_packet,
1840 packet->packet_id);
1841
1842 if (diff >= 0 && diff < PACKET_CACHE_SIZE) {
1843 insert_packet (sender, packet);
1844 return;
1845 }
1846
1847 if (diff < 0 && gibber_r_multicast_packet_diff (priv->first_packet,
1848 packet->packet_id) >= 0)
1849 {
1850 /* We already had this one, silently ignore */
1851 DEBUG_SENDER (sender, "Detect resent of packet 0x%x",
1852 packet->packet_id);
1853 return;
1854 }
1855
1856 DEBUG_SENDER (sender, "Packet 0x%x out of range, dropping (%x %x %x)",
1857 packet->packet_id, priv->first_packet,
1858 sender->next_output_packet, sender->next_input_packet);
1859 }
1860
1861 gboolean
gibber_r_multicast_sender_repair_request(GibberRMulticastSender * sender,guint32 id)1862 gibber_r_multicast_sender_repair_request (GibberRMulticastSender *sender,
1863 guint32 id)
1864 {
1865 GibberRMulticastSenderPrivate *priv =
1866 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1867 gint diff;
1868 PacketInfo *info;
1869
1870 if (sender->state < GIBBER_R_MULTICAST_SENDER_STATE_PREPARING)
1871 {
1872 DEBUG_SENDER (sender, "ignore repair request");
1873 return FALSE;
1874 }
1875
1876 info = g_hash_table_lookup (priv->packet_cache, &id);
1877 if (info != NULL && info->packet != NULL)
1878 {
1879 schedule_do_repair (sender, id);
1880 return TRUE;
1881 }
1882
1883 diff = gibber_r_multicast_packet_diff (sender->next_output_packet, id);
1884
1885 if (diff >= 0 && diff < PACKET_CACHE_SIZE)
1886 {
1887 if (sender->state >= GIBBER_R_MULTICAST_SENDER_STATE_STOPPED)
1888 /* Beyond stopped state we only send out repairs for packets we have */
1889 return FALSE;
1890
1891 if (info == NULL)
1892 {
1893 guint32 i;
1894
1895 for (i = sender->next_output_packet ; i != id + 1; i++)
1896 {
1897 schedule_repair (sender, i);
1898 }
1899 }
1900 else
1901 {
1902 /* else we already knew about the packets existance, but didn't see
1903 the packet just yet. Which means we already have a repair timeout
1904 running */
1905 g_assert (info->timeout != 0);
1906 /* Reschedule the repair */
1907 g_source_remove (info->timeout);
1908 info->timeout = 0;
1909 schedule_repair (sender, id);
1910 }
1911
1912 return TRUE;
1913 }
1914
1915 DEBUG_SENDER (sender, "Repair request packet 0x%x out of range, ignoring",
1916 id);
1917
1918 return FALSE;
1919 }
1920
1921 gboolean
gibber_r_multicast_sender_seen(GibberRMulticastSender * sender,guint32 id)1922 gibber_r_multicast_sender_seen (GibberRMulticastSender *sender, guint32 id)
1923 {
1924 gint diff;
1925 guint32 i, last;
1926
1927 g_assert (sender != NULL);
1928 DEBUG_SENDER(sender, "Seen next packet 0x%x", id);
1929
1930 if (sender->state < GIBBER_R_MULTICAST_SENDER_STATE_PREPARING
1931 || sender->state >= GIBBER_R_MULTICAST_SENDER_STATE_UNKNOWN_FAILED)
1932 {
1933 return FALSE;
1934 }
1935
1936 diff = gibber_r_multicast_packet_diff (sender->next_input_packet, id);
1937 if (diff < 0)
1938 return TRUE;
1939
1940 last = sender->next_output_packet + PACKET_CACHE_SIZE;
1941
1942 /* Ensure that we don't overfill the CACHE */
1943 last = gibber_r_multicast_packet_diff (last, id) > 0 ? last : id;
1944
1945 for (i = sender->next_input_packet; i != last; i ++)
1946 {
1947 schedule_repair (sender, i);
1948 }
1949 return FALSE;
1950 }
1951
1952 void
gibber_r_multicast_senders_updated(GibberRMulticastSender * sender)1953 gibber_r_multicast_senders_updated (GibberRMulticastSender *sender)
1954 {
1955 pop_packets (sender);
1956 }
1957
1958 void
gibber_r_multicast_sender_whois_push(GibberRMulticastSender * sender,const GibberRMulticastPacket * packet)1959 gibber_r_multicast_sender_whois_push (GibberRMulticastSender *sender,
1960 const GibberRMulticastPacket *packet)
1961 {
1962 GibberRMulticastSenderPrivate *priv =
1963 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
1964
1965 switch (packet->type) {
1966 case PACKET_TYPE_WHOIS_REQUEST:
1967 g_assert (packet->data.whois_request.sender_id == sender->id);
1968
1969 if (sender->name != NULL)
1970 {
1971 if (priv->whois_timer == 0)
1972 {
1973 gint timeout = g_random_int_range (MIN_WHOIS_REPLY_TIMEOUT,
1974 MAX_WHOIS_REPLY_TIMEOUT);
1975 priv->whois_timer =
1976 g_timeout_add (timeout, do_whois_reply, sender);
1977 DEBUG_SENDER (sender, "Scheduled whois reply in %d ms", timeout);
1978 }
1979 }
1980 else
1981 {
1982 schedule_whois_request (sender, TRUE);
1983 }
1984 break;
1985 case PACKET_TYPE_WHOIS_REPLY:
1986 g_assert (packet->sender == sender->id);
1987
1988 if (sender->name == NULL)
1989 {
1990 name_discovered (sender, packet->data.whois_reply.sender_name);
1991 }
1992 else
1993 {
1994 /* FIXME: collision detection */
1995 stop_whois_discovery (sender);
1996 }
1997
1998 pop_packets (sender);
1999 break;
2000 default:
2001 g_assert_not_reached ();
2002 }
2003 }
2004
2005 void
gibber_r_multicast_sender_set_packet_repeat(GibberRMulticastSender * sender,guint32 packet_id,gboolean repeat)2006 gibber_r_multicast_sender_set_packet_repeat (GibberRMulticastSender *sender,
2007 guint32 packet_id, gboolean repeat)
2008 {
2009 GibberRMulticastSenderPrivate *priv =
2010 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
2011
2012 PacketInfo *info;
2013
2014 info = g_hash_table_lookup (priv->packet_cache, &packet_id);
2015 g_assert (info != NULL && info->packet != NULL);
2016
2017 if (info->repeating == repeat)
2018 {
2019 return;
2020 }
2021
2022 info->repeating = repeat;
2023
2024 if (repeat)
2025 {
2026 if (info->timeout == 0)
2027 schedule_do_repair (sender, packet_id);
2028 }
2029 else
2030 {
2031 packet_info_try_gc (sender, info);
2032 }
2033
2034 /* FIXME: If repeat is turned off, we repeat it at least once more as there
2035 * might have been a repair request after the last repeating.. This is
2036 * ofcourse suboptimal */
2037 }
2038
2039 guint
gibber_r_multicast_sender_packet_cache_size(GibberRMulticastSender * sender)2040 gibber_r_multicast_sender_packet_cache_size ( GibberRMulticastSender *sender)
2041 {
2042 GibberRMulticastSenderPrivate *priv =
2043 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
2044
2045 /* The important cache size is until our cutoff point, which can be less
2046 * then the last packet we actually did receive from this sender */
2047 if (sender->state >= GIBBER_R_MULTICAST_SENDER_STATE_FAILED)
2048 return gibber_r_multicast_packet_diff (priv->first_packet,
2049 priv->end_point);
2050
2051 return gibber_r_multicast_packet_diff (priv->first_packet,
2052 sender->next_input_packet);
2053 }
2054
2055 static AckInfo *
gibber_r_multicast_sender_get_ackinfo(GibberRMulticastSender * sender,guint32 sender_id)2056 gibber_r_multicast_sender_get_ackinfo (GibberRMulticastSender *sender,
2057 guint32 sender_id)
2058 {
2059 GibberRMulticastSenderPrivate *priv =
2060 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
2061
2062 return (AckInfo *) g_hash_table_lookup (priv->acks, &sender_id);
2063 }
2064
2065 void
gibber_r_multicast_sender_ack(GibberRMulticastSender * sender,guint32 ack)2066 gibber_r_multicast_sender_ack (GibberRMulticastSender *sender, guint32 ack)
2067 {
2068 guint32 i;
2069 GibberRMulticastSenderPrivate *priv =
2070 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE (sender);
2071
2072 if (gibber_r_multicast_packet_diff (priv->first_packet, ack) < 0)
2073 {
2074 return;
2075 }
2076
2077 for (i = priv->first_packet ; i != ack; i++)
2078 {
2079 PacketInfo *info;
2080
2081 info = g_hash_table_lookup (priv->packet_cache, &i);
2082 if (info == NULL)
2083 continue;
2084
2085 info->acked = TRUE;
2086 packet_info_try_gc (sender, info);
2087 }
2088 }
2089
2090
2091
2092 /* Tell the sender to not signal data starting from this packet */
2093 void
gibber_r_multicast_sender_hold_data(GibberRMulticastSender * sender,guint32 packet_id)2094 gibber_r_multicast_sender_hold_data (GibberRMulticastSender *sender,
2095 guint32 packet_id)
2096 {
2097 GibberRMulticastSenderPrivate *priv =
2098 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE(sender);
2099
2100 priv->holding_data = TRUE;
2101 priv->holding_point = packet_id;
2102 DEBUG_SENDER (sender, "Holding data starting at %x", packet_id);
2103
2104 /* Pop packets in case the holding_point moved forward */
2105 pop_packets (sender);
2106 }
2107
2108 /* Stop holding back data of the sender */
2109 void
gibber_r_multicast_sender_release_data(GibberRMulticastSender * sender)2110 gibber_r_multicast_sender_release_data (GibberRMulticastSender *sender)
2111 {
2112 GibberRMulticastSenderPrivate *priv =
2113 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE(sender);
2114
2115 DEBUG_SENDER (sender, "Releasing data");
2116 priv->holding_data = FALSE;
2117 pop_packets (sender);
2118 }
2119
2120 static void
stop_packet(gpointer key,gpointer value,gpointer user_data)2121 stop_packet (gpointer key, gpointer value, gpointer user_data)
2122 {
2123 PacketInfo *p = (PacketInfo *) value;
2124
2125 if (p->timeout != 0)
2126 {
2127 g_source_remove (p->timeout);
2128 p->timeout = 0;
2129 }
2130 }
2131
2132 void
gibber_r_multicast_sender_stop(GibberRMulticastSender * sender)2133 gibber_r_multicast_sender_stop (GibberRMulticastSender *sender)
2134 {
2135 GibberRMulticastSenderPrivate *priv =
2136 GIBBER_R_MULTICAST_SENDER_GET_PRIVATE(sender);
2137
2138 if (sender->state >= GIBBER_R_MULTICAST_SENDER_STATE_STOPPED)
2139 return;
2140
2141 if (priv->whois_timer != 0)
2142 {
2143 g_source_remove (priv->whois_timer);
2144 priv->whois_timer = 0;
2145 }
2146
2147 g_hash_table_foreach (priv->packet_cache, stop_packet, NULL);
2148 set_state (sender, GIBBER_R_MULTICAST_SENDER_STATE_STOPPED);
2149 }
2150
2151 void
_gibber_r_multicast_TEST_sender_fail(GibberRMulticastSender * sender)2152 _gibber_r_multicast_TEST_sender_fail (GibberRMulticastSender *sender)
2153 {
2154 signal_failure (sender);
2155 }
2156
2157