1 /*
2 * Copyright (c) 2003-2006 MontaVista Software, Inc.
3 * Copyright (c) 2006-2009 Red Hat, Inc.
4 *
5 * All rights reserved.
6 *
7 * Author: Steven Dake (sdake@redhat.com)
8 *
9 * This software licensed under BSD license, the text of which follows:
10 *
11 * Redistribution and use in source and binary forms, with or without
12 * modification, are permitted provided that the following conditions are met:
13 *
14 * - Redistributions of source code must retain the above copyright notice,
15 * this list of conditions and the following disclaimer.
16 * - Redistributions in binary form must reproduce the above copyright notice,
17 * this list of conditions and the following disclaimer in the documentation
18 * and/or other materials provided with the distribution.
19 * - Neither the name of the MontaVista Software, Inc. nor the names of its
20 * contributors may be used to endorse or promote products derived from this
21 * software without specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33 * THE POSSIBILITY OF SUCH DAMAGE.
34 */
35
36 /*
37 * The first version of this code was based upon Yair Amir's PhD thesis:
38 * http://www.cs.jhu.edu/~yairamir/phd.ps) (ch4,5).
39 *
40 * The current version of totemsrp implements the Totem protocol specified in:
41 * http://citeseer.ist.psu.edu/amir95totem.html
42 *
43 * The deviations from the above published protocols are:
44 * - encryption of message contents with nss
45 * - authentication of meessage contents with SHA1/HMAC
46 * - token hold mode where token doesn't rotate on unused ring - reduces cpu
47 * usage on 1.6ghz xeon from 35% to less then .1 % as measured by top
48 */
49
50 #include <config.h>
51
52 #include <assert.h>
53 #ifdef HAVE_ALLOCA_H
54 #include <alloca.h>
55 #endif
56 #include <sys/mman.h>
57 #include <sys/types.h>
58 #include <sys/stat.h>
59 #include <sys/socket.h>
60 #include <netdb.h>
61 #include <sys/un.h>
62 #include <sys/ioctl.h>
63 #include <sys/param.h>
64 #include <netinet/in.h>
65 #include <arpa/inet.h>
66 #include <unistd.h>
67 #include <fcntl.h>
68 #include <stdlib.h>
69 #include <stdio.h>
70 #include <errno.h>
71 #include <sched.h>
72 #include <time.h>
73 #include <sys/time.h>
74 #include <sys/poll.h>
75 #include <sys/uio.h>
76 #include <limits.h>
77
78 #include <qb/qbdefs.h>
79 #include <qb/qbutil.h>
80 #include <qb/qbloop.h>
81
82 #include <corosync/swab.h>
83 #include <corosync/sq.h>
84 #include <corosync/list.h>
85
86 #define LOGSYS_UTILS_ONLY 1
87 #include <corosync/logsys.h>
88
89 #include "totemsrp.h"
90 #include "totemrrp.h"
91 #include "totemnet.h"
92
93 #include "cs_queue.h"
94
95 #define LOCALHOST_IP inet_addr("127.0.0.1")
96 #define QUEUE_RTR_ITEMS_SIZE_MAX 16384 /* allow 16384 retransmit items */
97 #define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384 /* allow 500 messages to be queued */
98 #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
99 #define MAXIOVS 5
100 #define RETRANSMIT_ENTRIES_MAX 30
101 #define TOKEN_SIZE_MAX 64000 /* bytes */
102 #define LEAVE_DUMMY_NODEID 0
103
104 /*
105 * Rollover handling:
106 * SEQNO_START_MSG is the starting sequence number after a new configuration
107 * This should remain zero, unless testing overflow in which case
108 * 0x7ffff000 and 0xfffff000 are good starting values.
109 *
110 * SEQNO_START_TOKEN is the starting sequence number after a new configuration
111 * for a token. This should remain zero, unless testing overflow in which
112 * case 07fffff00 or 0xffffff00 are good starting values.
113 */
114 #define SEQNO_START_MSG 0x0
115 #define SEQNO_START_TOKEN 0x0
116
117 /*
118 * These can be used ot test different rollover points
119 * #define SEQNO_START_MSG 0xfffffe00
120 * #define SEQNO_START_TOKEN 0xfffffe00
121 */
122
123 /*
124 * These can be used to test the error recovery algorithms
125 * #define TEST_DROP_ORF_TOKEN_PERCENTAGE 30
126 * #define TEST_DROP_COMMIT_TOKEN_PERCENTAGE 30
127 * #define TEST_DROP_MCAST_PERCENTAGE 50
128 * #define TEST_RECOVERY_MSG_COUNT 300
129 */
130
131 /*
132 * we compare incoming messages to determine if their endian is
133 * different - if so convert them
134 *
135 * do not change
136 */
137 #define ENDIAN_LOCAL 0xff22
138
139 enum message_type {
140 MESSAGE_TYPE_ORF_TOKEN = 0, /* Ordering, Reliability, Flow (ORF) control Token */
141 MESSAGE_TYPE_MCAST = 1, /* ring ordered multicast message */
142 MESSAGE_TYPE_MEMB_MERGE_DETECT = 2, /* merge rings if there are available rings */
143 MESSAGE_TYPE_MEMB_JOIN = 3, /* membership join message */
144 MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4, /* membership commit token */
145 MESSAGE_TYPE_TOKEN_HOLD_CANCEL = 5, /* cancel the holding of the token */
146 };
147
148 enum encapsulation_type {
149 MESSAGE_ENCAPSULATED = 1,
150 MESSAGE_NOT_ENCAPSULATED = 2
151 };
152
153 /*
154 * New membership algorithm local variables
155 */
156 struct consensus_list_item {
157 struct srp_addr addr;
158 int set;
159 };
160
161
162 struct token_callback_instance {
163 struct list_head list;
164 int (*callback_fn) (enum totem_callback_token_type type, const void *);
165 enum totem_callback_token_type callback_type;
166 int delete;
167 void *data;
168 };
169
170
171 struct totemsrp_socket {
172 int mcast;
173 int token;
174 };
175
176 struct message_header {
177 char type;
178 char encapsulated;
179 unsigned short endian_detector;
180 unsigned int nodeid;
181 } __attribute__((packed));
182
183
184 struct mcast {
185 struct message_header header;
186 struct srp_addr system_from;
187 unsigned int seq;
188 int this_seqno;
189 struct memb_ring_id ring_id;
190 unsigned int node_id;
191 int guarantee;
192 } __attribute__((packed));
193
194
195 struct rtr_item {
196 struct memb_ring_id ring_id;
197 unsigned int seq;
198 }__attribute__((packed));
199
200
201 struct orf_token {
202 struct message_header header;
203 unsigned int seq;
204 unsigned int token_seq;
205 unsigned int aru;
206 unsigned int aru_addr;
207 struct memb_ring_id ring_id;
208 unsigned int backlog;
209 unsigned int fcc;
210 int retrans_flg;
211 int rtr_list_entries;
212 struct rtr_item rtr_list[0];
213 }__attribute__((packed));
214
215
216 struct memb_join {
217 struct message_header header;
218 struct srp_addr system_from;
219 unsigned int proc_list_entries;
220 unsigned int failed_list_entries;
221 unsigned long long ring_seq;
222 unsigned char end_of_memb_join[0];
223 /*
224 * These parts of the data structure are dynamic:
225 * struct srp_addr proc_list[];
226 * struct srp_addr failed_list[];
227 */
228 } __attribute__((packed));
229
230
231 struct memb_merge_detect {
232 struct message_header header;
233 struct srp_addr system_from;
234 struct memb_ring_id ring_id;
235 } __attribute__((packed));
236
237
238 struct token_hold_cancel {
239 struct message_header header;
240 struct memb_ring_id ring_id;
241 } __attribute__((packed));
242
243
244 struct memb_commit_token_memb_entry {
245 struct memb_ring_id ring_id;
246 unsigned int aru;
247 unsigned int high_delivered;
248 unsigned int received_flg;
249 }__attribute__((packed));
250
251
252 struct memb_commit_token {
253 struct message_header header;
254 unsigned int token_seq;
255 struct memb_ring_id ring_id;
256 unsigned int retrans_flg;
257 int memb_index;
258 int addr_entries;
259 unsigned char end_of_commit_token[0];
260 /*
261 * These parts of the data structure are dynamic:
262 *
263 * struct srp_addr addr[PROCESSOR_COUNT_MAX];
264 * struct memb_commit_token_memb_entry memb_list[PROCESSOR_COUNT_MAX];
265 */
266 }__attribute__((packed));
267
268 struct message_item {
269 struct mcast *mcast;
270 unsigned int msg_len;
271 };
272
273 struct sort_queue_item {
274 struct mcast *mcast;
275 unsigned int msg_len;
276 };
277
278 enum memb_state {
279 MEMB_STATE_OPERATIONAL = 1,
280 MEMB_STATE_GATHER = 2,
281 MEMB_STATE_COMMIT = 3,
282 MEMB_STATE_RECOVERY = 4
283 };
284
285 struct totemsrp_instance {
286 int iface_changes;
287
288 int failed_to_recv;
289
290 /*
291 * Flow control mcasts and remcasts on last and current orf_token
292 */
293 int fcc_remcast_last;
294
295 int fcc_mcast_last;
296
297 int fcc_remcast_current;
298
299 struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX];
300
301 int consensus_list_entries;
302
303 struct srp_addr my_id;
304
305 struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX];
306
307 struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX];
308
309 struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX];
310
311 struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX];
312
313 struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX];
314
315 struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX];
316
317 struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX];
318
319 unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX];
320
321 int my_proc_list_entries;
322
323 int my_failed_list_entries;
324
325 int my_new_memb_entries;
326
327 int my_trans_memb_entries;
328
329 int my_memb_entries;
330
331 int my_deliver_memb_entries;
332
333 int my_left_memb_entries;
334
335 int my_leave_memb_entries;
336
337 struct memb_ring_id my_ring_id;
338
339 struct memb_ring_id my_old_ring_id;
340
341 int my_aru_count;
342
343 int my_merge_detect_timeout_outstanding;
344
345 unsigned int my_last_aru;
346
347 int my_seq_unchanged;
348
349 int my_received_flg;
350
351 unsigned int my_high_seq_received;
352
353 unsigned int my_install_seq;
354
355 int my_rotation_counter;
356
357 int my_set_retrans_flg;
358
359 int my_retrans_flg_count;
360
361 unsigned int my_high_ring_delivered;
362
363 int heartbeat_timeout;
364
365 /*
366 * Queues used to order, deliver, and recover messages
367 */
368 struct cs_queue new_message_queue;
369
370 struct cs_queue new_message_queue_trans;
371
372 struct cs_queue retrans_message_queue;
373
374 struct sq regular_sort_queue;
375
376 struct sq recovery_sort_queue;
377
378 /*
379 * Received up to and including
380 */
381 unsigned int my_aru;
382
383 unsigned int my_high_delivered;
384
385 struct list_head token_callback_received_listhead;
386
387 struct list_head token_callback_sent_listhead;
388
389 char orf_token_retransmit[TOKEN_SIZE_MAX];
390
391 int orf_token_retransmit_size;
392
393 unsigned int my_token_seq;
394
395 /*
396 * Timers
397 */
398 qb_loop_timer_handle timer_pause_timeout;
399
400 qb_loop_timer_handle timer_orf_token_timeout;
401
402 qb_loop_timer_handle timer_orf_token_retransmit_timeout;
403
404 qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout;
405
406 qb_loop_timer_handle timer_merge_detect_timeout;
407
408 qb_loop_timer_handle memb_timer_state_gather_join_timeout;
409
410 qb_loop_timer_handle memb_timer_state_gather_consensus_timeout;
411
412 qb_loop_timer_handle memb_timer_state_commit_timeout;
413
414 qb_loop_timer_handle timer_heartbeat_timeout;
415
416 /*
417 * Function and data used to log messages
418 */
419 int totemsrp_log_level_security;
420
421 int totemsrp_log_level_error;
422
423 int totemsrp_log_level_warning;
424
425 int totemsrp_log_level_notice;
426
427 int totemsrp_log_level_debug;
428
429 int totemsrp_log_level_trace;
430
431 int totemsrp_subsys_id;
432
433 void (*totemsrp_log_printf) (
434 int level,
435 int subsys,
436 const char *function,
437 const char *file,
438 int line,
439 const char *format, ...)__attribute__((format(printf, 6, 7)));;
440
441 enum memb_state memb_state;
442
443 //TODO struct srp_addr next_memb;
444
445 qb_loop_t *totemsrp_poll_handle;
446
447 struct totem_ip_address mcast_address;
448
449 void (*totemsrp_deliver_fn) (
450 unsigned int nodeid,
451 const void *msg,
452 unsigned int msg_len,
453 int endian_conversion_required);
454
455 void (*totemsrp_confchg_fn) (
456 enum totem_configuration_type configuration_type,
457 const unsigned int *member_list, size_t member_list_entries,
458 const unsigned int *left_list, size_t left_list_entries,
459 const unsigned int *joined_list, size_t joined_list_entries,
460 const struct memb_ring_id *ring_id);
461
462 void (*totemsrp_service_ready_fn) (void);
463
464 void (*totemsrp_waiting_trans_ack_cb_fn) (
465 int waiting_trans_ack);
466
467 void (*memb_ring_id_create_or_load) (
468 struct memb_ring_id *memb_ring_id,
469 const struct totem_ip_address *addr);
470
471 void (*memb_ring_id_store) (
472 const struct memb_ring_id *memb_ring_id,
473 const struct totem_ip_address *addr);
474
475 int global_seqno;
476
477 int my_token_held;
478
479 unsigned long long token_ring_id_seq;
480
481 unsigned int last_released;
482
483 unsigned int set_aru;
484
485 int old_ring_state_saved;
486
487 int old_ring_state_aru;
488
489 unsigned int old_ring_state_high_seq_received;
490
491 unsigned int my_last_seq;
492
493 struct timeval tv_old;
494
495 void *totemrrp_context;
496
497 struct totem_config *totem_config;
498
499 unsigned int use_heartbeat;
500
501 unsigned int my_trc;
502
503 unsigned int my_pbl;
504
505 unsigned int my_cbl;
506
507 uint64_t pause_timestamp;
508
509 struct memb_commit_token *commit_token;
510
511 totemsrp_stats_t stats;
512
513 uint32_t orf_token_discard;
514
515 uint32_t originated_orf_token;
516
517 uint32_t threaded_mode_enabled;
518
519 uint32_t waiting_trans_ack;
520
521 int flushing;
522
523 void * token_recv_event_handle;
524 void * token_sent_event_handle;
525 char commit_token_storage[40000];
526 };
527
528 struct message_handlers {
529 int count;
530 int (*handler_functions[6]) (
531 struct totemsrp_instance *instance,
532 const void *msg,
533 size_t msg_len,
534 int endian_conversion_needed);
535 };
536
537 enum gather_state_from {
538 TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT = 0,
539 TOTEMSRP_GSFROM_GATHER_MISSING1 = 1,
540 TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE = 2,
541 TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED = 3,
542 TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE = 4,
543 TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE = 5,
544 TOTEMSRP_GSFROM_FAILED_TO_RECEIVE = 6,
545 TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE = 7,
546 TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE = 8,
547 TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE = 9,
548 TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE = 10,
549 TOTEMSRP_GSFROM_MERGE_DURING_JOIN = 11,
550 TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE = 12,
551 TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE = 13,
552 TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY = 14,
553 TOTEMSRP_GSFROM_INTERFACE_CHANGE = 15,
554 TOTEMSRP_GSFROM_MAX = TOTEMSRP_GSFROM_INTERFACE_CHANGE,
555 };
556
557 const char* gather_state_from_desc [] = {
558 [TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT] = "consensus timeout",
559 [TOTEMSRP_GSFROM_GATHER_MISSING1] = "MISSING",
560 [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE] = "The token was lost in the OPERATIONAL state.",
561 [TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED] = "The consensus timeout expired.",
562 [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE] = "The token was lost in the COMMIT state.",
563 [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE] = "The token was lost in the RECOVERY state.",
564 [TOTEMSRP_GSFROM_FAILED_TO_RECEIVE] = "failed to receive",
565 [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE] = "foreign message in operational state",
566 [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE] = "foreign message in gather state",
567 [TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE] = "merge during operational state",
568 [TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE] = "merge during gather state",
569 [TOTEMSRP_GSFROM_MERGE_DURING_JOIN] = "merge during join",
570 [TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE] = "join during operational state",
571 [TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE] = "join during commit state",
572 [TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY] = "join during recovery",
573 [TOTEMSRP_GSFROM_INTERFACE_CHANGE] = "interface change",
574 };
575
576 /*
577 * forward decls
578 */
579 static int message_handler_orf_token (
580 struct totemsrp_instance *instance,
581 const void *msg,
582 size_t msg_len,
583 int endian_conversion_needed);
584
585 static int message_handler_mcast (
586 struct totemsrp_instance *instance,
587 const void *msg,
588 size_t msg_len,
589 int endian_conversion_needed);
590
591 static int message_handler_memb_merge_detect (
592 struct totemsrp_instance *instance,
593 const void *msg,
594 size_t msg_len,
595 int endian_conversion_needed);
596
597 static int message_handler_memb_join (
598 struct totemsrp_instance *instance,
599 const void *msg,
600 size_t msg_len,
601 int endian_conversion_needed);
602
603 static int message_handler_memb_commit_token (
604 struct totemsrp_instance *instance,
605 const void *msg,
606 size_t msg_len,
607 int endian_conversion_needed);
608
609 static int message_handler_token_hold_cancel (
610 struct totemsrp_instance *instance,
611 const void *msg,
612 size_t msg_len,
613 int endian_conversion_needed);
614
615 static void totemsrp_instance_initialize (struct totemsrp_instance *instance);
616
617 static unsigned int main_msgs_missing (void);
618
619 static void main_token_seqid_get (
620 const void *msg,
621 unsigned int *seqid,
622 unsigned int *token_is);
623
624 static void srp_addr_copy (struct srp_addr *dest, const struct srp_addr *src);
625
626 static void srp_addr_to_nodeid (
627 unsigned int *nodeid_out,
628 struct srp_addr *srp_addr_in,
629 unsigned int entries);
630
631 static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b);
632
633 static void memb_leave_message_send (struct totemsrp_instance *instance);
634
635 static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type);
636 static void memb_state_gather_enter (struct totemsrp_instance *instance, enum gather_state_from gather_from);
637 static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, unsigned int end_point);
638 static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token *oken,
639 int fcc_mcasts_allowed);
640 static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru);
641
642 static void memb_ring_id_set (struct totemsrp_instance *instance,
643 const struct memb_ring_id *ring_id);
644 static void target_set_completed (void *context);
645 static void memb_state_commit_token_update (struct totemsrp_instance *instance);
646 static void memb_state_commit_token_target_set (struct totemsrp_instance *instance);
647 static int memb_state_commit_token_send (struct totemsrp_instance *instance);
648 static int memb_state_commit_token_send_recovery (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
649 static void memb_state_commit_token_create (struct totemsrp_instance *instance);
650 static int token_hold_cancel_send (struct totemsrp_instance *instance);
651 static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out);
652 static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out);
653 static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out);
654 static void mcast_endian_convert (const struct mcast *in, struct mcast *out);
655 static void memb_merge_detect_endian_convert (
656 const struct memb_merge_detect *in,
657 struct memb_merge_detect *out);
658 static void srp_addr_copy_endian_convert (struct srp_addr *out, const struct srp_addr *in);
659 static void timer_function_orf_token_timeout (void *data);
660 static void timer_function_pause_timeout (void *data);
661 static void timer_function_heartbeat_timeout (void *data);
662 static void timer_function_token_retransmit_timeout (void *data);
663 static void timer_function_token_hold_retransmit_timeout (void *data);
664 static void timer_function_merge_detect_timeout (void *data);
665 static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance);
666 static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr);
667 static const char* gsfrom_to_msg(enum gather_state_from gsfrom);
668
669 void main_deliver_fn (
670 void *context,
671 const void *msg,
672 unsigned int msg_len);
673
674 void main_iface_change_fn (
675 void *context,
676 const struct totem_ip_address *iface_address,
677 unsigned int iface_no);
678
679 struct message_handlers totemsrp_message_handlers = {
680 6,
681 {
682 message_handler_orf_token, /* MESSAGE_TYPE_ORF_TOKEN */
683 message_handler_mcast, /* MESSAGE_TYPE_MCAST */
684 message_handler_memb_merge_detect, /* MESSAGE_TYPE_MEMB_MERGE_DETECT */
685 message_handler_memb_join, /* MESSAGE_TYPE_MEMB_JOIN */
686 message_handler_memb_commit_token, /* MESSAGE_TYPE_MEMB_COMMIT_TOKEN */
687 message_handler_token_hold_cancel /* MESSAGE_TYPE_TOKEN_HOLD_CANCEL */
688 }
689 };
690
691 #define log_printf(level, format, args...) \
692 do { \
693 instance->totemsrp_log_printf ( \
694 level, instance->totemsrp_subsys_id, \
695 __FUNCTION__, __FILE__, __LINE__, \
696 format, ##args); \
697 } while (0);
698 #define LOGSYS_PERROR(err_num, level, fmt, args...) \
699 do { \
700 char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
701 const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
702 instance->totemsrp_log_printf ( \
703 level, instance->totemsrp_subsys_id, \
704 __FUNCTION__, __FILE__, __LINE__, \
705 fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \
706 } while(0)
707
gsfrom_to_msg(enum gather_state_from gsfrom)708 static const char* gsfrom_to_msg(enum gather_state_from gsfrom)
709 {
710 if (gsfrom <= TOTEMSRP_GSFROM_MAX) {
711 return gather_state_from_desc[gsfrom];
712 }
713 else {
714 return "UNKNOWN";
715 }
716 }
717
totemsrp_instance_initialize(struct totemsrp_instance * instance)718 static void totemsrp_instance_initialize (struct totemsrp_instance *instance)
719 {
720 memset (instance, 0, sizeof (struct totemsrp_instance));
721
722 list_init (&instance->token_callback_received_listhead);
723
724 list_init (&instance->token_callback_sent_listhead);
725
726 instance->my_received_flg = 1;
727
728 instance->my_token_seq = SEQNO_START_TOKEN - 1;
729
730 instance->memb_state = MEMB_STATE_OPERATIONAL;
731
732 instance->set_aru = -1;
733
734 instance->my_aru = SEQNO_START_MSG;
735
736 instance->my_high_seq_received = SEQNO_START_MSG;
737
738 instance->my_high_delivered = SEQNO_START_MSG;
739
740 instance->orf_token_discard = 0;
741
742 instance->originated_orf_token = 0;
743
744 instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage;
745
746 instance->my_id.no_addrs = INTERFACE_MAX;
747
748 instance->waiting_trans_ack = 1;
749 }
750
main_token_seqid_get(const void * msg,unsigned int * seqid,unsigned int * token_is)751 static void main_token_seqid_get (
752 const void *msg,
753 unsigned int *seqid,
754 unsigned int *token_is)
755 {
756 const struct orf_token *token = msg;
757
758 *seqid = 0;
759 *token_is = 0;
760 if (token->header.type == MESSAGE_TYPE_ORF_TOKEN) {
761 *seqid = token->token_seq;
762 *token_is = 1;
763 }
764 }
765
main_msgs_missing(void)766 static unsigned int main_msgs_missing (void)
767 {
768 // TODO
769 return (0);
770 }
771
pause_flush(struct totemsrp_instance * instance)772 static int pause_flush (struct totemsrp_instance *instance)
773 {
774 uint64_t now_msec;
775 uint64_t timestamp_msec;
776 int res = 0;
777
778 now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC);
779 timestamp_msec = instance->pause_timestamp / QB_TIME_NS_IN_MSEC;
780
781 if ((now_msec - timestamp_msec) > (instance->totem_config->token_timeout / 2)) {
782 log_printf (instance->totemsrp_log_level_notice,
783 "Process pause detected for %d ms, flushing membership messages.", (unsigned int)(now_msec - timestamp_msec));
784 /*
785 * -1 indicates an error from recvmsg
786 */
787 do {
788 res = totemrrp_mcast_recv_empty (instance->totemrrp_context);
789 } while (res == -1);
790 }
791 return (res);
792 }
793
token_event_stats_collector(enum totem_callback_token_type type,const void * void_instance)794 static int token_event_stats_collector (enum totem_callback_token_type type, const void *void_instance)
795 {
796 struct totemsrp_instance *instance = (struct totemsrp_instance *)void_instance;
797 uint32_t time_now;
798 unsigned long long nano_secs = qb_util_nano_current_get ();
799
800 time_now = (nano_secs / QB_TIME_NS_IN_MSEC);
801
802 if (type == TOTEM_CALLBACK_TOKEN_RECEIVED) {
803 /* incr latest token the index */
804 if (instance->stats.latest_token == (TOTEM_TOKEN_STATS_MAX - 1))
805 instance->stats.latest_token = 0;
806 else
807 instance->stats.latest_token++;
808
809 if (instance->stats.earliest_token == instance->stats.latest_token) {
810 /* we have filled up the array, start overwriting */
811 if (instance->stats.earliest_token == (TOTEM_TOKEN_STATS_MAX - 1))
812 instance->stats.earliest_token = 0;
813 else
814 instance->stats.earliest_token++;
815
816 instance->stats.token[instance->stats.earliest_token].rx = 0;
817 instance->stats.token[instance->stats.earliest_token].tx = 0;
818 instance->stats.token[instance->stats.earliest_token].backlog_calc = 0;
819 }
820
821 instance->stats.token[instance->stats.latest_token].rx = time_now;
822 instance->stats.token[instance->stats.latest_token].tx = 0; /* in case we drop the token */
823 } else {
824 instance->stats.token[instance->stats.latest_token].tx = time_now;
825 }
826 return 0;
827 }
828
829 /*
830 * Exported interfaces
831 */
totemsrp_initialize(qb_loop_t * poll_handle,void ** srp_context,struct totem_config * totem_config,totemmrp_stats_t * stats,void (* deliver_fn)(unsigned int nodeid,const void * msg,unsigned int msg_len,int endian_conversion_required),void (* confchg_fn)(enum totem_configuration_type configuration_type,const unsigned int * member_list,size_t member_list_entries,const unsigned int * left_list,size_t left_list_entries,const unsigned int * joined_list,size_t joined_list_entries,const struct memb_ring_id * ring_id),void (* waiting_trans_ack_cb_fn)(int waiting_trans_ack))832 int totemsrp_initialize (
833 qb_loop_t *poll_handle,
834 void **srp_context,
835 struct totem_config *totem_config,
836 totemmrp_stats_t *stats,
837
838 void (*deliver_fn) (
839 unsigned int nodeid,
840 const void *msg,
841 unsigned int msg_len,
842 int endian_conversion_required),
843
844 void (*confchg_fn) (
845 enum totem_configuration_type configuration_type,
846 const unsigned int *member_list, size_t member_list_entries,
847 const unsigned int *left_list, size_t left_list_entries,
848 const unsigned int *joined_list, size_t joined_list_entries,
849 const struct memb_ring_id *ring_id),
850 void (*waiting_trans_ack_cb_fn) (
851 int waiting_trans_ack))
852 {
853 struct totemsrp_instance *instance;
854 int res;
855
856 instance = malloc (sizeof (struct totemsrp_instance));
857 if (instance == NULL) {
858 goto error_exit;
859 }
860
861 totemsrp_instance_initialize (instance);
862
863 instance->totemsrp_waiting_trans_ack_cb_fn = waiting_trans_ack_cb_fn;
864 instance->totemsrp_waiting_trans_ack_cb_fn (1);
865
866 stats->srp = &instance->stats;
867 instance->stats.latest_token = 0;
868 instance->stats.earliest_token = 0;
869
870 instance->totem_config = totem_config;
871
872 /*
873 * Configure logging
874 */
875 instance->totemsrp_log_level_security = totem_config->totem_logging_configuration.log_level_security;
876 instance->totemsrp_log_level_error = totem_config->totem_logging_configuration.log_level_error;
877 instance->totemsrp_log_level_warning = totem_config->totem_logging_configuration.log_level_warning;
878 instance->totemsrp_log_level_notice = totem_config->totem_logging_configuration.log_level_notice;
879 instance->totemsrp_log_level_debug = totem_config->totem_logging_configuration.log_level_debug;
880 instance->totemsrp_log_level_trace = totem_config->totem_logging_configuration.log_level_trace;
881 instance->totemsrp_subsys_id = totem_config->totem_logging_configuration.log_subsys_id;
882 instance->totemsrp_log_printf = totem_config->totem_logging_configuration.log_printf;
883
884 /*
885 * Configure totem store and load functions
886 */
887 instance->memb_ring_id_create_or_load = totem_config->totem_memb_ring_id_create_or_load;
888 instance->memb_ring_id_store = totem_config->totem_memb_ring_id_store;
889
890 /*
891 * Initialize local variables for totemsrp
892 */
893 totemip_copy (&instance->mcast_address, &totem_config->interfaces[0].mcast_addr);
894
895 /*
896 * Display totem configuration
897 */
898 log_printf (instance->totemsrp_log_level_debug,
899 "Token Timeout (%d ms) retransmit timeout (%d ms)",
900 totem_config->token_timeout, totem_config->token_retransmit_timeout);
901 log_printf (instance->totemsrp_log_level_debug,
902 "token hold (%d ms) retransmits before loss (%d retrans)",
903 totem_config->token_hold_timeout, totem_config->token_retransmits_before_loss_const);
904 log_printf (instance->totemsrp_log_level_debug,
905 "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
906 totem_config->join_timeout,
907 totem_config->send_join_timeout,
908 totem_config->consensus_timeout,
909
910 totem_config->merge_timeout);
911 log_printf (instance->totemsrp_log_level_debug,
912 "downcheck (%d ms) fail to recv const (%d msgs)",
913 totem_config->downcheck_timeout, totem_config->fail_to_recv_const);
914 log_printf (instance->totemsrp_log_level_debug,
915 "seqno unchanged const (%d rotations) Maximum network MTU %d", totem_config->seqno_unchanged_const, totem_config->net_mtu);
916
917 log_printf (instance->totemsrp_log_level_debug,
918 "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
919 totem_config->window_size, totem_config->max_messages);
920
921 log_printf (instance->totemsrp_log_level_debug,
922 "missed count const (%d messages)",
923 totem_config->miss_count_const);
924
925 log_printf (instance->totemsrp_log_level_debug,
926 "send threads (%d threads)", totem_config->threads);
927 log_printf (instance->totemsrp_log_level_debug,
928 "RRP token expired timeout (%d ms)",
929 totem_config->rrp_token_expired_timeout);
930 log_printf (instance->totemsrp_log_level_debug,
931 "RRP token problem counter (%d ms)",
932 totem_config->rrp_problem_count_timeout);
933 log_printf (instance->totemsrp_log_level_debug,
934 "RRP threshold (%d problem count)",
935 totem_config->rrp_problem_count_threshold);
936 log_printf (instance->totemsrp_log_level_debug,
937 "RRP multicast threshold (%d problem count)",
938 totem_config->rrp_problem_count_mcast_threshold);
939 log_printf (instance->totemsrp_log_level_debug,
940 "RRP automatic recovery check timeout (%d ms)",
941 totem_config->rrp_autorecovery_check_timeout);
942 log_printf (instance->totemsrp_log_level_debug,
943 "RRP mode set to %s.", instance->totem_config->rrp_mode);
944
945 log_printf (instance->totemsrp_log_level_debug,
946 "heartbeat_failures_allowed (%d)", totem_config->heartbeat_failures_allowed);
947 log_printf (instance->totemsrp_log_level_debug,
948 "max_network_delay (%d ms)", totem_config->max_network_delay);
949
950
951 cs_queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX,
952 sizeof (struct message_item), instance->threaded_mode_enabled);
953
954 sq_init (&instance->regular_sort_queue,
955 QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
956
957 sq_init (&instance->recovery_sort_queue,
958 QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
959
960 instance->totemsrp_poll_handle = poll_handle;
961
962 instance->totemsrp_deliver_fn = deliver_fn;
963
964 instance->totemsrp_confchg_fn = confchg_fn;
965 instance->use_heartbeat = 1;
966
967 timer_function_pause_timeout (instance);
968
969 if ( totem_config->heartbeat_failures_allowed == 0 ) {
970 log_printf (instance->totemsrp_log_level_debug,
971 "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
972 instance->use_heartbeat = 0;
973 }
974
975 if (instance->use_heartbeat) {
976 instance->heartbeat_timeout
977 = (totem_config->heartbeat_failures_allowed) * totem_config->token_retransmit_timeout
978 + totem_config->max_network_delay;
979
980 if (instance->heartbeat_timeout >= totem_config->token_timeout) {
981 log_printf (instance->totemsrp_log_level_debug,
982 "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
983 instance->heartbeat_timeout,
984 totem_config->token_timeout);
985 log_printf (instance->totemsrp_log_level_debug,
986 "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
987 log_printf (instance->totemsrp_log_level_debug,
988 "heartbeat timeout should be less than the token timeout. Heartbeat is disabled!!");
989 instance->use_heartbeat = 0;
990 }
991 else {
992 log_printf (instance->totemsrp_log_level_debug,
993 "total heartbeat_timeout (%d ms)", instance->heartbeat_timeout);
994 }
995 }
996
997 res = totemrrp_initialize (
998 poll_handle,
999 &instance->totemrrp_context,
1000 totem_config,
1001 stats->srp,
1002 instance,
1003 main_deliver_fn,
1004 main_iface_change_fn,
1005 main_token_seqid_get,
1006 main_msgs_missing,
1007 target_set_completed);
1008 if (res == -1) {
1009 goto error_exit;
1010 }
1011
1012 /*
1013 * Must have net_mtu adjusted by totemrrp_initialize first
1014 */
1015 cs_queue_init (&instance->new_message_queue,
1016 MESSAGE_QUEUE_MAX,
1017 sizeof (struct message_item), instance->threaded_mode_enabled);
1018
1019 cs_queue_init (&instance->new_message_queue_trans,
1020 MESSAGE_QUEUE_MAX,
1021 sizeof (struct message_item), instance->threaded_mode_enabled);
1022
1023 totemsrp_callback_token_create (instance,
1024 &instance->token_recv_event_handle,
1025 TOTEM_CALLBACK_TOKEN_RECEIVED,
1026 0,
1027 token_event_stats_collector,
1028 instance);
1029 totemsrp_callback_token_create (instance,
1030 &instance->token_sent_event_handle,
1031 TOTEM_CALLBACK_TOKEN_SENT,
1032 0,
1033 token_event_stats_collector,
1034 instance);
1035 *srp_context = instance;
1036 return (0);
1037
1038 error_exit:
1039 return (-1);
1040 }
1041
totemsrp_finalize(void * srp_context)1042 void totemsrp_finalize (
1043 void *srp_context)
1044 {
1045 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1046
1047
1048 memb_leave_message_send (instance);
1049 totemrrp_finalize (instance->totemrrp_context);
1050 cs_queue_free (&instance->new_message_queue);
1051 cs_queue_free (&instance->new_message_queue_trans);
1052 cs_queue_free (&instance->retrans_message_queue);
1053 sq_free (&instance->regular_sort_queue);
1054 sq_free (&instance->recovery_sort_queue);
1055 free (instance);
1056 }
1057
1058 /*
1059 * Return configured interfaces. interfaces is array of totem_ip addresses allocated by caller,
1060 * with interaces_size number of items. iface_count is final number of interfaces filled by this
1061 * function.
1062 *
1063 * Function returns 0 on success, otherwise if interfaces array is not big enough, -2 is returned,
1064 * and if interface was not found, -1 is returned.
1065 */
totemsrp_ifaces_get(void * srp_context,unsigned int nodeid,struct totem_ip_address * interfaces,unsigned int interfaces_size,char *** status,unsigned int * iface_count)1066 int totemsrp_ifaces_get (
1067 void *srp_context,
1068 unsigned int nodeid,
1069 struct totem_ip_address *interfaces,
1070 unsigned int interfaces_size,
1071 char ***status,
1072 unsigned int *iface_count)
1073 {
1074 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1075 int res = 0;
1076 unsigned int found = 0;
1077 unsigned int i;
1078
1079 for (i = 0; i < instance->my_memb_entries; i++) {
1080 if (instance->my_memb_list[i].addr[0].nodeid == nodeid) {
1081 found = 1;
1082 break;
1083 }
1084 }
1085
1086 if (found) {
1087 *iface_count = instance->totem_config->interface_count;
1088
1089 if (interfaces_size >= *iface_count) {
1090 memcpy (interfaces, instance->my_memb_list[i].addr,
1091 sizeof (struct totem_ip_address) * *iface_count);
1092 } else {
1093 res = -2;
1094 }
1095
1096 goto finish;
1097 }
1098
1099 for (i = 0; i < instance->my_left_memb_entries; i++) {
1100 if (instance->my_left_memb_list[i].addr[0].nodeid == nodeid) {
1101 found = 1;
1102 break;
1103 }
1104 }
1105
1106 if (found) {
1107 *iface_count = instance->totem_config->interface_count;
1108
1109 if (interfaces_size >= *iface_count) {
1110 memcpy (interfaces, instance->my_left_memb_list[i].addr,
1111 sizeof (struct totem_ip_address) * *iface_count);
1112 } else {
1113 res = -2;
1114 }
1115 } else {
1116 res = -1;
1117 }
1118
1119 finish:
1120 totemrrp_ifaces_get (instance->totemrrp_context, status, NULL);
1121 return (res);
1122 }
1123
totemsrp_crypto_set(void * srp_context,const char * cipher_type,const char * hash_type)1124 int totemsrp_crypto_set (
1125 void *srp_context,
1126 const char *cipher_type,
1127 const char *hash_type)
1128 {
1129 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1130 int res;
1131
1132 res = totemrrp_crypto_set(instance->totemrrp_context, cipher_type, hash_type);
1133
1134 return (res);
1135 }
1136
1137
totemsrp_my_nodeid_get(void * srp_context)1138 unsigned int totemsrp_my_nodeid_get (
1139 void *srp_context)
1140 {
1141 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1142 unsigned int res;
1143
1144 res = instance->totem_config->interfaces[0].boundto.nodeid;
1145
1146 return (res);
1147 }
1148
totemsrp_my_family_get(void * srp_context)1149 int totemsrp_my_family_get (
1150 void *srp_context)
1151 {
1152 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1153 int res;
1154
1155 res = instance->totem_config->interfaces[0].boundto.family;
1156
1157 return (res);
1158 }
1159
1160
totemsrp_ring_reenable(void * srp_context)1161 int totemsrp_ring_reenable (
1162 void *srp_context)
1163 {
1164 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1165
1166 totemrrp_ring_reenable (instance->totemrrp_context,
1167 instance->totem_config->interface_count);
1168
1169 return (0);
1170 }
1171
1172
1173 /*
1174 * Set operations for use by the membership algorithm
1175 */
srp_addr_equal(const struct srp_addr * a,const struct srp_addr * b)1176 static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b)
1177 {
1178 unsigned int i;
1179 unsigned int res;
1180
1181 for (i = 0; i < 1; i++) {
1182 res = totemip_equal (&a->addr[i], &b->addr[i]);
1183 if (res == 0) {
1184 return (0);
1185 }
1186 }
1187 return (1);
1188 }
1189
srp_addr_copy(struct srp_addr * dest,const struct srp_addr * src)1190 static void srp_addr_copy (struct srp_addr *dest, const struct srp_addr *src)
1191 {
1192 unsigned int i;
1193
1194 dest->no_addrs = src->no_addrs;
1195
1196 for (i = 0; i < INTERFACE_MAX; i++) {
1197 totemip_copy (&dest->addr[i], &src->addr[i]);
1198 }
1199 }
1200
srp_addr_to_nodeid(unsigned int * nodeid_out,struct srp_addr * srp_addr_in,unsigned int entries)1201 static void srp_addr_to_nodeid (
1202 unsigned int *nodeid_out,
1203 struct srp_addr *srp_addr_in,
1204 unsigned int entries)
1205 {
1206 unsigned int i;
1207
1208 for (i = 0; i < entries; i++) {
1209 nodeid_out[i] = srp_addr_in[i].addr[0].nodeid;
1210 }
1211 }
1212
srp_addr_copy_endian_convert(struct srp_addr * out,const struct srp_addr * in)1213 static void srp_addr_copy_endian_convert (struct srp_addr *out, const struct srp_addr *in)
1214 {
1215 int i;
1216
1217 for (i = 0; i < INTERFACE_MAX; i++) {
1218 totemip_copy_endian_convert (&out->addr[i], &in->addr[i]);
1219 }
1220 }
1221
memb_consensus_reset(struct totemsrp_instance * instance)1222 static void memb_consensus_reset (struct totemsrp_instance *instance)
1223 {
1224 instance->consensus_list_entries = 0;
1225 }
1226
memb_set_subtract(struct srp_addr * out_list,int * out_list_entries,struct srp_addr * one_list,int one_list_entries,struct srp_addr * two_list,int two_list_entries)1227 static void memb_set_subtract (
1228 struct srp_addr *out_list, int *out_list_entries,
1229 struct srp_addr *one_list, int one_list_entries,
1230 struct srp_addr *two_list, int two_list_entries)
1231 {
1232 int found = 0;
1233 int i;
1234 int j;
1235
1236 *out_list_entries = 0;
1237
1238 for (i = 0; i < one_list_entries; i++) {
1239 for (j = 0; j < two_list_entries; j++) {
1240 if (srp_addr_equal (&one_list[i], &two_list[j])) {
1241 found = 1;
1242 break;
1243 }
1244 }
1245 if (found == 0) {
1246 srp_addr_copy (&out_list[*out_list_entries], &one_list[i]);
1247 *out_list_entries = *out_list_entries + 1;
1248 }
1249 found = 0;
1250 }
1251 }
1252
1253 /*
1254 * Set consensus for a specific processor
1255 */
memb_consensus_set(struct totemsrp_instance * instance,const struct srp_addr * addr)1256 static void memb_consensus_set (
1257 struct totemsrp_instance *instance,
1258 const struct srp_addr *addr)
1259 {
1260 int found = 0;
1261 int i;
1262
1263 if (addr->addr[0].nodeid == LEAVE_DUMMY_NODEID)
1264 return;
1265
1266 for (i = 0; i < instance->consensus_list_entries; i++) {
1267 if (srp_addr_equal(addr, &instance->consensus_list[i].addr)) {
1268 found = 1;
1269 break; /* found entry */
1270 }
1271 }
1272 srp_addr_copy (&instance->consensus_list[i].addr, addr);
1273 instance->consensus_list[i].set = 1;
1274 if (found == 0) {
1275 instance->consensus_list_entries++;
1276 }
1277 return;
1278 }
1279
1280 /*
1281 * Is consensus set for a specific processor
1282 */
memb_consensus_isset(struct totemsrp_instance * instance,const struct srp_addr * addr)1283 static int memb_consensus_isset (
1284 struct totemsrp_instance *instance,
1285 const struct srp_addr *addr)
1286 {
1287 int i;
1288
1289 for (i = 0; i < instance->consensus_list_entries; i++) {
1290 if (srp_addr_equal (addr, &instance->consensus_list[i].addr)) {
1291 return (instance->consensus_list[i].set);
1292 }
1293 }
1294 return (0);
1295 }
1296
1297 /*
1298 * Is consensus agreed upon based upon consensus database
1299 */
memb_consensus_agreed(struct totemsrp_instance * instance)1300 static int memb_consensus_agreed (
1301 struct totemsrp_instance *instance)
1302 {
1303 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
1304 int token_memb_entries = 0;
1305 int agreed = 1;
1306 int i;
1307
1308 memb_set_subtract (token_memb, &token_memb_entries,
1309 instance->my_proc_list, instance->my_proc_list_entries,
1310 instance->my_failed_list, instance->my_failed_list_entries);
1311
1312 for (i = 0; i < token_memb_entries; i++) {
1313 if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1314 agreed = 0;
1315 break;
1316 }
1317 }
1318
1319 if (agreed && instance->failed_to_recv == 1) {
1320 /*
1321 * Both nodes agreed on our failure. We don't care how many proc list items left because we
1322 * will create single ring anyway.
1323 */
1324
1325 return (agreed);
1326 }
1327
1328 assert (token_memb_entries >= 1);
1329
1330 return (agreed);
1331 }
1332
memb_consensus_notset(struct totemsrp_instance * instance,struct srp_addr * no_consensus_list,int * no_consensus_list_entries,struct srp_addr * comparison_list,int comparison_list_entries)1333 static void memb_consensus_notset (
1334 struct totemsrp_instance *instance,
1335 struct srp_addr *no_consensus_list,
1336 int *no_consensus_list_entries,
1337 struct srp_addr *comparison_list,
1338 int comparison_list_entries)
1339 {
1340 int i;
1341
1342 *no_consensus_list_entries = 0;
1343
1344 for (i = 0; i < instance->my_proc_list_entries; i++) {
1345 if (memb_consensus_isset (instance, &instance->my_proc_list[i]) == 0) {
1346 srp_addr_copy (&no_consensus_list[*no_consensus_list_entries], &instance->my_proc_list[i]);
1347 *no_consensus_list_entries = *no_consensus_list_entries + 1;
1348 }
1349 }
1350 }
1351
1352 /*
1353 * Is set1 equal to set2 Entries can be in different orders
1354 */
memb_set_equal(struct srp_addr * set1,int set1_entries,struct srp_addr * set2,int set2_entries)1355 static int memb_set_equal (
1356 struct srp_addr *set1, int set1_entries,
1357 struct srp_addr *set2, int set2_entries)
1358 {
1359 int i;
1360 int j;
1361
1362 int found = 0;
1363
1364 if (set1_entries != set2_entries) {
1365 return (0);
1366 }
1367 for (i = 0; i < set2_entries; i++) {
1368 for (j = 0; j < set1_entries; j++) {
1369 if (srp_addr_equal (&set1[j], &set2[i])) {
1370 found = 1;
1371 break;
1372 }
1373 }
1374 if (found == 0) {
1375 return (0);
1376 }
1377 found = 0;
1378 }
1379 return (1);
1380 }
1381
1382 /*
1383 * Is subset fully contained in fullset
1384 */
memb_set_subset(const struct srp_addr * subset,int subset_entries,const struct srp_addr * fullset,int fullset_entries)1385 static int memb_set_subset (
1386 const struct srp_addr *subset, int subset_entries,
1387 const struct srp_addr *fullset, int fullset_entries)
1388 {
1389 int i;
1390 int j;
1391 int found = 0;
1392
1393 if (subset_entries > fullset_entries) {
1394 return (0);
1395 }
1396 for (i = 0; i < subset_entries; i++) {
1397 for (j = 0; j < fullset_entries; j++) {
1398 if (srp_addr_equal (&subset[i], &fullset[j])) {
1399 found = 1;
1400 }
1401 }
1402 if (found == 0) {
1403 return (0);
1404 }
1405 found = 0;
1406 }
1407 return (1);
1408 }
1409 /*
1410 * merge subset into fullset taking care not to add duplicates
1411 */
memb_set_merge(const struct srp_addr * subset,int subset_entries,struct srp_addr * fullset,int * fullset_entries)1412 static void memb_set_merge (
1413 const struct srp_addr *subset, int subset_entries,
1414 struct srp_addr *fullset, int *fullset_entries)
1415 {
1416 int found = 0;
1417 int i;
1418 int j;
1419
1420 for (i = 0; i < subset_entries; i++) {
1421 for (j = 0; j < *fullset_entries; j++) {
1422 if (srp_addr_equal (&fullset[j], &subset[i])) {
1423 found = 1;
1424 break;
1425 }
1426 }
1427 if (found == 0) {
1428 srp_addr_copy (&fullset[*fullset_entries], &subset[i]);
1429 *fullset_entries = *fullset_entries + 1;
1430 }
1431 found = 0;
1432 }
1433 return;
1434 }
1435
memb_set_and_with_ring_id(struct srp_addr * set1,struct memb_ring_id * set1_ring_ids,int set1_entries,struct srp_addr * set2,int set2_entries,struct memb_ring_id * old_ring_id,struct srp_addr * and,int * and_entries)1436 static void memb_set_and_with_ring_id (
1437 struct srp_addr *set1,
1438 struct memb_ring_id *set1_ring_ids,
1439 int set1_entries,
1440 struct srp_addr *set2,
1441 int set2_entries,
1442 struct memb_ring_id *old_ring_id,
1443 struct srp_addr *and,
1444 int *and_entries)
1445 {
1446 int i;
1447 int j;
1448 int found = 0;
1449
1450 *and_entries = 0;
1451
1452 for (i = 0; i < set2_entries; i++) {
1453 for (j = 0; j < set1_entries; j++) {
1454 if (srp_addr_equal (&set1[j], &set2[i])) {
1455 if (memcmp (&set1_ring_ids[j], old_ring_id, sizeof (struct memb_ring_id)) == 0) {
1456 found = 1;
1457 }
1458 break;
1459 }
1460 }
1461 if (found) {
1462 srp_addr_copy (&and[*and_entries], &set1[j]);
1463 *and_entries = *and_entries + 1;
1464 }
1465 found = 0;
1466 }
1467 return;
1468 }
1469
1470 #ifdef CODE_COVERAGE
memb_set_print(char * string,struct srp_addr * list,int list_entries)1471 static void memb_set_print (
1472 char *string,
1473 struct srp_addr *list,
1474 int list_entries)
1475 {
1476 int i;
1477 int j;
1478 printf ("List '%s' contains %d entries:\n", string, list_entries);
1479
1480 for (i = 0; i < list_entries; i++) {
1481 printf ("Address %d with %d rings\n", i, list[i].no_addrs);
1482 for (j = 0; j < list[i].no_addrs; j++) {
1483 printf ("\tiface %d %s\n", j, totemip_print (&list[i].addr[j]));
1484 printf ("\tfamily %d\n", list[i].addr[j].family);
1485 }
1486 }
1487 }
1488 #endif
my_leave_memb_clear(struct totemsrp_instance * instance)1489 static void my_leave_memb_clear(
1490 struct totemsrp_instance *instance)
1491 {
1492 memset(instance->my_leave_memb_list, 0, sizeof(instance->my_leave_memb_list));
1493 instance->my_leave_memb_entries = 0;
1494 }
1495
my_leave_memb_match(struct totemsrp_instance * instance,unsigned int nodeid)1496 static unsigned int my_leave_memb_match(
1497 struct totemsrp_instance *instance,
1498 unsigned int nodeid)
1499 {
1500 int i;
1501 unsigned int ret = 0;
1502
1503 for (i = 0; i < instance->my_leave_memb_entries; i++){
1504 if (instance->my_leave_memb_list[i] == nodeid){
1505 ret = nodeid;
1506 break;
1507 }
1508 }
1509 return ret;
1510 }
1511
my_leave_memb_set(struct totemsrp_instance * instance,unsigned int nodeid)1512 static void my_leave_memb_set(
1513 struct totemsrp_instance *instance,
1514 unsigned int nodeid)
1515 {
1516 int i, found = 0;
1517 for (i = 0; i < instance->my_leave_memb_entries; i++){
1518 if (instance->my_leave_memb_list[i] == nodeid){
1519 found = 1;
1520 break;
1521 }
1522 }
1523 if (found == 1) {
1524 return;
1525 }
1526 if (instance->my_leave_memb_entries < (PROCESSOR_COUNT_MAX - 1)) {
1527 instance->my_leave_memb_list[instance->my_leave_memb_entries] = nodeid;
1528 instance->my_leave_memb_entries++;
1529 } else {
1530 log_printf (instance->totemsrp_log_level_warning,
1531 "Cannot set LEAVE nodeid=%d", nodeid);
1532 }
1533 }
1534
1535
totemsrp_buffer_alloc(struct totemsrp_instance * instance)1536 static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance)
1537 {
1538 assert (instance != NULL);
1539 return totemrrp_buffer_alloc (instance->totemrrp_context);
1540 }
1541
totemsrp_buffer_release(struct totemsrp_instance * instance,void * ptr)1542 static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr)
1543 {
1544 assert (instance != NULL);
1545 totemrrp_buffer_release (instance->totemrrp_context, ptr);
1546 }
1547
reset_token_retransmit_timeout(struct totemsrp_instance * instance)1548 static void reset_token_retransmit_timeout (struct totemsrp_instance *instance)
1549 {
1550 int32_t res;
1551
1552 qb_loop_timer_del (instance->totemsrp_poll_handle,
1553 instance->timer_orf_token_retransmit_timeout);
1554 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1555 QB_LOOP_MED,
1556 instance->totem_config->token_retransmit_timeout*QB_TIME_NS_IN_MSEC,
1557 (void *)instance,
1558 timer_function_token_retransmit_timeout,
1559 &instance->timer_orf_token_retransmit_timeout);
1560 if (res != 0) {
1561 log_printf(instance->totemsrp_log_level_error, "reset_token_retransmit_timeout - qb_loop_timer_add error : %d", res);
1562 }
1563
1564 }
1565
start_merge_detect_timeout(struct totemsrp_instance * instance)1566 static void start_merge_detect_timeout (struct totemsrp_instance *instance)
1567 {
1568 int32_t res;
1569
1570 if (instance->my_merge_detect_timeout_outstanding == 0) {
1571 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1572 QB_LOOP_MED,
1573 instance->totem_config->merge_timeout*QB_TIME_NS_IN_MSEC,
1574 (void *)instance,
1575 timer_function_merge_detect_timeout,
1576 &instance->timer_merge_detect_timeout);
1577 if (res != 0) {
1578 log_printf(instance->totemsrp_log_level_error, "start_merge_detect_timeout - qb_loop_timer_add error : %d", res);
1579 }
1580
1581 instance->my_merge_detect_timeout_outstanding = 1;
1582 }
1583 }
1584
cancel_merge_detect_timeout(struct totemsrp_instance * instance)1585 static void cancel_merge_detect_timeout (struct totemsrp_instance *instance)
1586 {
1587 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_merge_detect_timeout);
1588 instance->my_merge_detect_timeout_outstanding = 0;
1589 }
1590
1591 /*
1592 * ring_state_* is used to save and restore the sort queue
1593 * state when a recovery operation fails (and enters gather)
1594 */
old_ring_state_save(struct totemsrp_instance * instance)1595 static void old_ring_state_save (struct totemsrp_instance *instance)
1596 {
1597 if (instance->old_ring_state_saved == 0) {
1598 instance->old_ring_state_saved = 1;
1599 memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
1600 sizeof (struct memb_ring_id));
1601 instance->old_ring_state_aru = instance->my_aru;
1602 instance->old_ring_state_high_seq_received = instance->my_high_seq_received;
1603 log_printf (instance->totemsrp_log_level_debug,
1604 "Saving state aru %x high seq received %x",
1605 instance->my_aru, instance->my_high_seq_received);
1606 }
1607 }
1608
old_ring_state_restore(struct totemsrp_instance * instance)1609 static void old_ring_state_restore (struct totemsrp_instance *instance)
1610 {
1611 instance->my_aru = instance->old_ring_state_aru;
1612 instance->my_high_seq_received = instance->old_ring_state_high_seq_received;
1613 log_printf (instance->totemsrp_log_level_debug,
1614 "Restoring instance->my_aru %x my high seq received %x",
1615 instance->my_aru, instance->my_high_seq_received);
1616 }
1617
old_ring_state_reset(struct totemsrp_instance * instance)1618 static void old_ring_state_reset (struct totemsrp_instance *instance)
1619 {
1620 log_printf (instance->totemsrp_log_level_debug,
1621 "Resetting old ring state");
1622 instance->old_ring_state_saved = 0;
1623 }
1624
reset_pause_timeout(struct totemsrp_instance * instance)1625 static void reset_pause_timeout (struct totemsrp_instance *instance)
1626 {
1627 int32_t res;
1628
1629 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_pause_timeout);
1630 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1631 QB_LOOP_MED,
1632 instance->totem_config->token_timeout * QB_TIME_NS_IN_MSEC / 5,
1633 (void *)instance,
1634 timer_function_pause_timeout,
1635 &instance->timer_pause_timeout);
1636 if (res != 0) {
1637 log_printf(instance->totemsrp_log_level_error, "reset_pause_timeout - qb_loop_timer_add error : %d", res);
1638 }
1639 }
1640
reset_token_timeout(struct totemsrp_instance * instance)1641 static void reset_token_timeout (struct totemsrp_instance *instance) {
1642 int32_t res;
1643
1644 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1645 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1646 QB_LOOP_MED,
1647 instance->totem_config->token_timeout*QB_TIME_NS_IN_MSEC,
1648 (void *)instance,
1649 timer_function_orf_token_timeout,
1650 &instance->timer_orf_token_timeout);
1651 if (res != 0) {
1652 log_printf(instance->totemsrp_log_level_error, "reset_token_timeout - qb_loop_timer_add error : %d", res);
1653 }
1654 }
1655
reset_heartbeat_timeout(struct totemsrp_instance * instance)1656 static void reset_heartbeat_timeout (struct totemsrp_instance *instance) {
1657 int32_t res;
1658
1659 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1660 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1661 QB_LOOP_MED,
1662 instance->heartbeat_timeout*QB_TIME_NS_IN_MSEC,
1663 (void *)instance,
1664 timer_function_heartbeat_timeout,
1665 &instance->timer_heartbeat_timeout);
1666 if (res != 0) {
1667 log_printf(instance->totemsrp_log_level_error, "reset_heartbeat_timeout - qb_loop_timer_add error : %d", res);
1668 }
1669 }
1670
1671
cancel_token_timeout(struct totemsrp_instance * instance)1672 static void cancel_token_timeout (struct totemsrp_instance *instance) {
1673 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1674 }
1675
cancel_heartbeat_timeout(struct totemsrp_instance * instance)1676 static void cancel_heartbeat_timeout (struct totemsrp_instance *instance) {
1677 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1678 }
1679
cancel_token_retransmit_timeout(struct totemsrp_instance * instance)1680 static void cancel_token_retransmit_timeout (struct totemsrp_instance *instance)
1681 {
1682 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_retransmit_timeout);
1683 }
1684
start_token_hold_retransmit_timeout(struct totemsrp_instance * instance)1685 static void start_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1686 {
1687 int32_t res;
1688
1689 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1690 QB_LOOP_MED,
1691 instance->totem_config->token_hold_timeout*QB_TIME_NS_IN_MSEC,
1692 (void *)instance,
1693 timer_function_token_hold_retransmit_timeout,
1694 &instance->timer_orf_token_hold_retransmit_timeout);
1695 if (res != 0) {
1696 log_printf(instance->totemsrp_log_level_error, "start_token_hold_retransmit_timeout - qb_loop_timer_add error : %d", res);
1697 }
1698 }
1699
cancel_token_hold_retransmit_timeout(struct totemsrp_instance * instance)1700 static void cancel_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1701 {
1702 qb_loop_timer_del (instance->totemsrp_poll_handle,
1703 instance->timer_orf_token_hold_retransmit_timeout);
1704 }
1705
memb_state_consensus_timeout_expired(struct totemsrp_instance * instance)1706 static void memb_state_consensus_timeout_expired (
1707 struct totemsrp_instance *instance)
1708 {
1709 struct srp_addr no_consensus_list[PROCESSOR_COUNT_MAX];
1710 int no_consensus_list_entries;
1711
1712 instance->stats.consensus_timeouts++;
1713 if (memb_consensus_agreed (instance)) {
1714 memb_consensus_reset (instance);
1715
1716 memb_consensus_set (instance, &instance->my_id);
1717
1718 reset_token_timeout (instance); // REVIEWED
1719 } else {
1720 memb_consensus_notset (
1721 instance,
1722 no_consensus_list,
1723 &no_consensus_list_entries,
1724 instance->my_proc_list,
1725 instance->my_proc_list_entries);
1726
1727 memb_set_merge (no_consensus_list, no_consensus_list_entries,
1728 instance->my_failed_list, &instance->my_failed_list_entries);
1729 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT);
1730 }
1731 }
1732
1733 static void memb_join_message_send (struct totemsrp_instance *instance);
1734
1735 static void memb_merge_detect_transmit (struct totemsrp_instance *instance);
1736
1737 /*
1738 * Timers used for various states of the membership algorithm
1739 */
timer_function_pause_timeout(void * data)1740 static void timer_function_pause_timeout (void *data)
1741 {
1742 struct totemsrp_instance *instance = data;
1743
1744 instance->pause_timestamp = qb_util_nano_current_get ();
1745 reset_pause_timeout (instance);
1746 }
1747
memb_recovery_state_token_loss(struct totemsrp_instance * instance)1748 static void memb_recovery_state_token_loss (struct totemsrp_instance *instance)
1749 {
1750 old_ring_state_restore (instance);
1751 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE);
1752 instance->stats.recovery_token_lost++;
1753 }
1754
timer_function_orf_token_timeout(void * data)1755 static void timer_function_orf_token_timeout (void *data)
1756 {
1757 struct totemsrp_instance *instance = data;
1758
1759 switch (instance->memb_state) {
1760 case MEMB_STATE_OPERATIONAL:
1761 log_printf (instance->totemsrp_log_level_debug,
1762 "The token was lost in the OPERATIONAL state.");
1763 log_printf (instance->totemsrp_log_level_notice,
1764 "A processor failed, forming new configuration.");
1765 totemrrp_iface_check (instance->totemrrp_context);
1766 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE);
1767 instance->stats.operational_token_lost++;
1768 break;
1769
1770 case MEMB_STATE_GATHER:
1771 log_printf (instance->totemsrp_log_level_debug,
1772 "The consensus timeout expired.");
1773 memb_state_consensus_timeout_expired (instance);
1774 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED);
1775 instance->stats.gather_token_lost++;
1776 break;
1777
1778 case MEMB_STATE_COMMIT:
1779 log_printf (instance->totemsrp_log_level_debug,
1780 "The token was lost in the COMMIT state.");
1781 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE);
1782 instance->stats.commit_token_lost++;
1783 break;
1784
1785 case MEMB_STATE_RECOVERY:
1786 log_printf (instance->totemsrp_log_level_debug,
1787 "The token was lost in the RECOVERY state.");
1788 memb_recovery_state_token_loss (instance);
1789 instance->orf_token_discard = 1;
1790 break;
1791 }
1792 }
1793
timer_function_heartbeat_timeout(void * data)1794 static void timer_function_heartbeat_timeout (void *data)
1795 {
1796 struct totemsrp_instance *instance = data;
1797 log_printf (instance->totemsrp_log_level_debug,
1798 "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->memb_state);
1799 timer_function_orf_token_timeout(data);
1800 }
1801
memb_timer_function_state_gather(void * data)1802 static void memb_timer_function_state_gather (void *data)
1803 {
1804 struct totemsrp_instance *instance = data;
1805 int32_t res;
1806
1807 switch (instance->memb_state) {
1808 case MEMB_STATE_OPERATIONAL:
1809 case MEMB_STATE_RECOVERY:
1810 assert (0); /* this should never happen */
1811 break;
1812 case MEMB_STATE_GATHER:
1813 case MEMB_STATE_COMMIT:
1814 memb_join_message_send (instance);
1815
1816 /*
1817 * Restart the join timeout
1818 `*/
1819 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
1820
1821 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1822 QB_LOOP_MED,
1823 instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
1824 (void *)instance,
1825 memb_timer_function_state_gather,
1826 &instance->memb_timer_state_gather_join_timeout);
1827
1828 if (res != 0) {
1829 log_printf(instance->totemsrp_log_level_error, "memb_timer_function_state_gather - qb_loop_timer_add error : %d", res);
1830 }
1831 break;
1832 }
1833 }
1834
memb_timer_function_gather_consensus_timeout(void * data)1835 static void memb_timer_function_gather_consensus_timeout (void *data)
1836 {
1837 struct totemsrp_instance *instance = data;
1838 memb_state_consensus_timeout_expired (instance);
1839 }
1840
deliver_messages_from_recovery_to_regular(struct totemsrp_instance * instance)1841 static void deliver_messages_from_recovery_to_regular (struct totemsrp_instance *instance)
1842 {
1843 unsigned int i;
1844 struct sort_queue_item *recovery_message_item;
1845 struct sort_queue_item regular_message_item;
1846 unsigned int range = 0;
1847 int res;
1848 void *ptr;
1849 struct mcast *mcast;
1850
1851 log_printf (instance->totemsrp_log_level_debug,
1852 "recovery to regular %x-%x", SEQNO_START_MSG + 1, instance->my_aru);
1853
1854 range = instance->my_aru - SEQNO_START_MSG;
1855 /*
1856 * Move messages from recovery to regular sort queue
1857 */
1858 // todo should i be initialized to 0 or 1 ?
1859 for (i = 1; i <= range; i++) {
1860 res = sq_item_get (&instance->recovery_sort_queue,
1861 i + SEQNO_START_MSG, &ptr);
1862 if (res != 0) {
1863 continue;
1864 }
1865 recovery_message_item = ptr;
1866
1867 /*
1868 * Convert recovery message into regular message
1869 */
1870 mcast = recovery_message_item->mcast;
1871 if (mcast->header.encapsulated == MESSAGE_ENCAPSULATED) {
1872 /*
1873 * Message is a recovery message encapsulated
1874 * in a new ring message
1875 */
1876 regular_message_item.mcast =
1877 (struct mcast *)(((char *)recovery_message_item->mcast) + sizeof (struct mcast));
1878 regular_message_item.msg_len =
1879 recovery_message_item->msg_len - sizeof (struct mcast);
1880 mcast = regular_message_item.mcast;
1881 } else {
1882 /*
1883 * TODO this case shouldn't happen
1884 */
1885 continue;
1886 }
1887
1888 log_printf (instance->totemsrp_log_level_debug,
1889 "comparing if ring id is for this processors old ring seqno %d",
1890 mcast->seq);
1891
1892 /*
1893 * Only add this message to the regular sort
1894 * queue if it was originated with the same ring
1895 * id as the previous ring
1896 */
1897 if (memcmp (&instance->my_old_ring_id, &mcast->ring_id,
1898 sizeof (struct memb_ring_id)) == 0) {
1899
1900 res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq);
1901 if (res == 0) {
1902 sq_item_add (&instance->regular_sort_queue,
1903 ®ular_message_item, mcast->seq);
1904 if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) {
1905 instance->old_ring_state_high_seq_received = mcast->seq;
1906 }
1907 }
1908 } else {
1909 log_printf (instance->totemsrp_log_level_debug,
1910 "-not adding msg with seq no %x", mcast->seq);
1911 }
1912 }
1913 }
1914
1915 /*
1916 * Change states in the state machine of the membership algorithm
1917 */
memb_state_operational_enter(struct totemsrp_instance * instance)1918 static void memb_state_operational_enter (struct totemsrp_instance *instance)
1919 {
1920 struct srp_addr joined_list[PROCESSOR_COUNT_MAX];
1921 int joined_list_entries = 0;
1922 unsigned int aru_save;
1923 unsigned int joined_list_totemip[PROCESSOR_COUNT_MAX];
1924 unsigned int trans_memb_list_totemip[PROCESSOR_COUNT_MAX];
1925 unsigned int new_memb_list_totemip[PROCESSOR_COUNT_MAX];
1926 unsigned int left_list[PROCESSOR_COUNT_MAX];
1927 unsigned int i;
1928 unsigned int res;
1929 char left_node_msg[1024];
1930 char joined_node_msg[1024];
1931 char failed_node_msg[1024];
1932
1933 instance->originated_orf_token = 0;
1934
1935 memb_consensus_reset (instance);
1936
1937 old_ring_state_reset (instance);
1938
1939 deliver_messages_from_recovery_to_regular (instance);
1940
1941 log_printf (instance->totemsrp_log_level_trace,
1942 "Delivering to app %x to %x",
1943 instance->my_high_delivered + 1, instance->old_ring_state_high_seq_received);
1944
1945 aru_save = instance->my_aru;
1946 instance->my_aru = instance->old_ring_state_aru;
1947
1948 messages_deliver_to_app (instance, 0, instance->old_ring_state_high_seq_received);
1949
1950 /*
1951 * Calculate joined and left list
1952 */
1953 memb_set_subtract (instance->my_left_memb_list,
1954 &instance->my_left_memb_entries,
1955 instance->my_memb_list, instance->my_memb_entries,
1956 instance->my_trans_memb_list, instance->my_trans_memb_entries);
1957
1958 memb_set_subtract (joined_list, &joined_list_entries,
1959 instance->my_new_memb_list, instance->my_new_memb_entries,
1960 instance->my_trans_memb_list, instance->my_trans_memb_entries);
1961
1962 /*
1963 * Install new membership
1964 */
1965 instance->my_memb_entries = instance->my_new_memb_entries;
1966 memcpy (&instance->my_memb_list, instance->my_new_memb_list,
1967 sizeof (struct srp_addr) * instance->my_memb_entries);
1968 instance->last_released = 0;
1969 instance->my_set_retrans_flg = 0;
1970
1971 /*
1972 * Inform RRP about transitional change
1973 */
1974 totemrrp_membership_changed (
1975 instance->totemrrp_context,
1976 TOTEM_CONFIGURATION_TRANSITIONAL,
1977 instance->my_trans_memb_list, instance->my_trans_memb_entries,
1978 instance->my_left_memb_list, instance->my_left_memb_entries,
1979 NULL, 0,
1980 &instance->my_ring_id);
1981 /*
1982 * Deliver transitional configuration to application
1983 */
1984 srp_addr_to_nodeid (left_list, instance->my_left_memb_list,
1985 instance->my_left_memb_entries);
1986 srp_addr_to_nodeid (trans_memb_list_totemip,
1987 instance->my_trans_memb_list, instance->my_trans_memb_entries);
1988 instance->totemsrp_confchg_fn (TOTEM_CONFIGURATION_TRANSITIONAL,
1989 trans_memb_list_totemip, instance->my_trans_memb_entries,
1990 left_list, instance->my_left_memb_entries,
1991 0, 0, &instance->my_ring_id);
1992 instance->waiting_trans_ack = 1;
1993 instance->totemsrp_waiting_trans_ack_cb_fn (1);
1994
1995 // TODO we need to filter to ensure we only deliver those
1996 // messages which are part of instance->my_deliver_memb
1997 messages_deliver_to_app (instance, 1, instance->old_ring_state_high_seq_received);
1998
1999 instance->my_aru = aru_save;
2000
2001 /*
2002 * Inform RRP about regular membership change
2003 */
2004 totemrrp_membership_changed (
2005 instance->totemrrp_context,
2006 TOTEM_CONFIGURATION_REGULAR,
2007 instance->my_new_memb_list, instance->my_new_memb_entries,
2008 NULL, 0,
2009 joined_list, joined_list_entries,
2010 &instance->my_ring_id);
2011 /*
2012 * Deliver regular configuration to application
2013 */
2014 srp_addr_to_nodeid (new_memb_list_totemip,
2015 instance->my_new_memb_list, instance->my_new_memb_entries);
2016 srp_addr_to_nodeid (joined_list_totemip, joined_list,
2017 joined_list_entries);
2018 instance->totemsrp_confchg_fn (TOTEM_CONFIGURATION_REGULAR,
2019 new_memb_list_totemip, instance->my_new_memb_entries,
2020 0, 0,
2021 joined_list_totemip, joined_list_entries, &instance->my_ring_id);
2022
2023 /*
2024 * The recovery sort queue now becomes the regular
2025 * sort queue. It is necessary to copy the state
2026 * into the regular sort queue.
2027 */
2028 sq_copy (&instance->regular_sort_queue, &instance->recovery_sort_queue);
2029 instance->my_last_aru = SEQNO_START_MSG;
2030
2031 /* When making my_proc_list smaller, ensure that the
2032 * now non-used entries are zero-ed out. There are some suspect
2033 * assert's that assume that there is always 2 entries in the list.
2034 * These fail when my_proc_list is reduced to 1 entry (and the
2035 * valid [0] entry is the same as the 'unused' [1] entry).
2036 */
2037 memset(instance->my_proc_list, 0,
2038 sizeof (struct srp_addr) * instance->my_proc_list_entries);
2039
2040 instance->my_proc_list_entries = instance->my_new_memb_entries;
2041 memcpy (instance->my_proc_list, instance->my_new_memb_list,
2042 sizeof (struct srp_addr) * instance->my_memb_entries);
2043
2044 instance->my_failed_list_entries = 0;
2045 /*
2046 * TODO Not exactly to spec
2047 *
2048 * At the entry to this function all messages without a gap are
2049 * deliered.
2050 *
2051 * This code throw away messages from the last gap in the sort queue
2052 * to my_high_seq_received
2053 *
2054 * What should really happen is we should deliver all messages up to
2055 * a gap, then delier the transitional configuration, then deliver
2056 * the messages between the first gap and my_high_seq_received, then
2057 * deliver a regular configuration, then deliver the regular
2058 * configuration
2059 *
2060 * Unfortunately totempg doesn't appear to like this operating mode
2061 * which needs more inspection
2062 */
2063 i = instance->my_high_seq_received + 1;
2064 do {
2065 void *ptr;
2066
2067 i -= 1;
2068 res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2069 if (i == 0) {
2070 break;
2071 }
2072 } while (res);
2073
2074 instance->my_high_delivered = i;
2075
2076 for (i = 0; i <= instance->my_high_delivered; i++) {
2077 void *ptr;
2078
2079 res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2080 if (res == 0) {
2081 struct sort_queue_item *regular_message;
2082
2083 regular_message = ptr;
2084 free (regular_message->mcast);
2085 }
2086 }
2087 sq_items_release (&instance->regular_sort_queue, instance->my_high_delivered);
2088 instance->last_released = instance->my_high_delivered;
2089
2090 if (joined_list_entries) {
2091 int sptr = 0;
2092 sptr += snprintf(joined_node_msg, sizeof(joined_node_msg)-sptr, " joined:");
2093 for (i=0; i< joined_list_entries; i++) {
2094 sptr += snprintf(joined_node_msg+sptr, sizeof(joined_node_msg)-sptr, " %u", joined_list_totemip[i]);
2095 }
2096 }
2097 else {
2098 joined_node_msg[0] = '\0';
2099 }
2100
2101 if (instance->my_left_memb_entries) {
2102 int sptr = 0;
2103 int sptr2 = 0;
2104 sptr += snprintf(left_node_msg, sizeof(left_node_msg)-sptr, " left:");
2105 for (i=0; i< instance->my_left_memb_entries; i++) {
2106 sptr += snprintf(left_node_msg+sptr, sizeof(left_node_msg)-sptr, " %u", left_list[i]);
2107 }
2108 for (i=0; i< instance->my_left_memb_entries; i++) {
2109 if (my_leave_memb_match(instance, left_list[i]) == 0) {
2110 if (sptr2 == 0) {
2111 sptr2 += snprintf(failed_node_msg, sizeof(failed_node_msg)-sptr2, " failed:");
2112 }
2113 sptr2 += snprintf(failed_node_msg+sptr2, sizeof(left_node_msg)-sptr2, " %u", left_list[i]);
2114 }
2115 }
2116 if (sptr2 == 0) {
2117 failed_node_msg[0] = '\0';
2118 }
2119 }
2120 else {
2121 left_node_msg[0] = '\0';
2122 failed_node_msg[0] = '\0';
2123 }
2124
2125 my_leave_memb_clear(instance);
2126
2127 log_printf (instance->totemsrp_log_level_debug,
2128 "entering OPERATIONAL state.");
2129 log_printf (instance->totemsrp_log_level_notice,
2130 "A new membership (%s:%lld) was formed. Members%s%s",
2131 totemip_print (&instance->my_ring_id.rep),
2132 instance->my_ring_id.seq,
2133 joined_node_msg,
2134 left_node_msg);
2135
2136 if (strlen(failed_node_msg)) {
2137 log_printf (instance->totemsrp_log_level_notice,
2138 "Failed to receive the leave message.%s",
2139 failed_node_msg);
2140 }
2141
2142 instance->memb_state = MEMB_STATE_OPERATIONAL;
2143
2144 instance->stats.operational_entered++;
2145 instance->stats.continuous_gather = 0;
2146
2147 instance->my_received_flg = 1;
2148
2149 reset_pause_timeout (instance);
2150
2151 /*
2152 * Save ring id information from this configuration to determine
2153 * which processors are transitioning from old regular configuration
2154 * in to new regular configuration on the next configuration change
2155 */
2156 memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
2157 sizeof (struct memb_ring_id));
2158
2159 return;
2160 }
2161
memb_state_gather_enter(struct totemsrp_instance * instance,enum gather_state_from gather_from)2162 static void memb_state_gather_enter (
2163 struct totemsrp_instance *instance,
2164 enum gather_state_from gather_from)
2165 {
2166 int32_t res;
2167
2168 instance->orf_token_discard = 1;
2169
2170 instance->originated_orf_token = 0;
2171
2172 memb_set_merge (
2173 &instance->my_id, 1,
2174 instance->my_proc_list, &instance->my_proc_list_entries);
2175
2176 memb_join_message_send (instance);
2177
2178 /*
2179 * Restart the join timeout
2180 */
2181 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2182
2183 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2184 QB_LOOP_MED,
2185 instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
2186 (void *)instance,
2187 memb_timer_function_state_gather,
2188 &instance->memb_timer_state_gather_join_timeout);
2189 if (res != 0) {
2190 log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(1) : %d", res);
2191 }
2192
2193 /*
2194 * Restart the consensus timeout
2195 */
2196 qb_loop_timer_del (instance->totemsrp_poll_handle,
2197 instance->memb_timer_state_gather_consensus_timeout);
2198
2199 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2200 QB_LOOP_MED,
2201 instance->totem_config->consensus_timeout*QB_TIME_NS_IN_MSEC,
2202 (void *)instance,
2203 memb_timer_function_gather_consensus_timeout,
2204 &instance->memb_timer_state_gather_consensus_timeout);
2205 if (res != 0) {
2206 log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(2) : %d", res);
2207 }
2208
2209 /*
2210 * Cancel the token loss and token retransmission timeouts
2211 */
2212 cancel_token_retransmit_timeout (instance); // REVIEWED
2213 cancel_token_timeout (instance); // REVIEWED
2214 cancel_merge_detect_timeout (instance);
2215
2216 memb_consensus_reset (instance);
2217
2218 memb_consensus_set (instance, &instance->my_id);
2219
2220 log_printf (instance->totemsrp_log_level_debug,
2221 "entering GATHER state from %d(%s).",
2222 gather_from, gsfrom_to_msg(gather_from));
2223
2224 instance->memb_state = MEMB_STATE_GATHER;
2225 instance->stats.gather_entered++;
2226
2227 if (gather_from == TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED) {
2228 /*
2229 * State 3 means gather, so we are continuously gathering.
2230 */
2231 instance->stats.continuous_gather++;
2232 }
2233
2234 return;
2235 }
2236
2237 static void timer_function_token_retransmit_timeout (void *data);
2238
target_set_completed(void * context)2239 static void target_set_completed (
2240 void *context)
2241 {
2242 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
2243
2244 memb_state_commit_token_send (instance);
2245
2246 }
2247
memb_state_commit_enter(struct totemsrp_instance * instance)2248 static void memb_state_commit_enter (
2249 struct totemsrp_instance *instance)
2250 {
2251 old_ring_state_save (instance);
2252
2253 memb_state_commit_token_update (instance);
2254
2255 memb_state_commit_token_target_set (instance);
2256
2257 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2258
2259 instance->memb_timer_state_gather_join_timeout = 0;
2260
2261 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_consensus_timeout);
2262
2263 instance->memb_timer_state_gather_consensus_timeout = 0;
2264
2265 memb_ring_id_set (instance, &instance->commit_token->ring_id);
2266 instance->memb_ring_id_store (&instance->my_ring_id, &instance->my_id.addr[0]);
2267
2268 instance->token_ring_id_seq = instance->my_ring_id.seq;
2269
2270 log_printf (instance->totemsrp_log_level_debug,
2271 "entering COMMIT state.");
2272
2273 instance->memb_state = MEMB_STATE_COMMIT;
2274 reset_token_retransmit_timeout (instance); // REVIEWED
2275 reset_token_timeout (instance); // REVIEWED
2276
2277 instance->stats.commit_entered++;
2278 instance->stats.continuous_gather = 0;
2279
2280 /*
2281 * reset all flow control variables since we are starting a new ring
2282 */
2283 instance->my_trc = 0;
2284 instance->my_pbl = 0;
2285 instance->my_cbl = 0;
2286 /*
2287 * commit token sent after callback that token target has been set
2288 */
2289 }
2290
memb_state_recovery_enter(struct totemsrp_instance * instance,struct memb_commit_token * commit_token)2291 static void memb_state_recovery_enter (
2292 struct totemsrp_instance *instance,
2293 struct memb_commit_token *commit_token)
2294 {
2295 int i;
2296 int local_received_flg = 1;
2297 unsigned int low_ring_aru;
2298 unsigned int range = 0;
2299 unsigned int messages_originated = 0;
2300 const struct srp_addr *addr;
2301 struct memb_commit_token_memb_entry *memb_list;
2302 struct memb_ring_id my_new_memb_ring_id_list[PROCESSOR_COUNT_MAX];
2303
2304 addr = (const struct srp_addr *)commit_token->end_of_commit_token;
2305 memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
2306
2307 log_printf (instance->totemsrp_log_level_debug,
2308 "entering RECOVERY state.");
2309
2310 instance->orf_token_discard = 0;
2311
2312 instance->my_high_ring_delivered = 0;
2313
2314 sq_reinit (&instance->recovery_sort_queue, SEQNO_START_MSG);
2315 cs_queue_reinit (&instance->retrans_message_queue);
2316
2317 low_ring_aru = instance->old_ring_state_high_seq_received;
2318
2319 memb_state_commit_token_send_recovery (instance, commit_token);
2320
2321 instance->my_token_seq = SEQNO_START_TOKEN - 1;
2322
2323 /*
2324 * Build regular configuration
2325 */
2326 totemrrp_processor_count_set (
2327 instance->totemrrp_context,
2328 commit_token->addr_entries);
2329
2330 /*
2331 * Build transitional configuration
2332 */
2333 for (i = 0; i < instance->my_new_memb_entries; i++) {
2334 memcpy (&my_new_memb_ring_id_list[i],
2335 &memb_list[i].ring_id,
2336 sizeof (struct memb_ring_id));
2337 }
2338 memb_set_and_with_ring_id (
2339 instance->my_new_memb_list,
2340 my_new_memb_ring_id_list,
2341 instance->my_new_memb_entries,
2342 instance->my_memb_list,
2343 instance->my_memb_entries,
2344 &instance->my_old_ring_id,
2345 instance->my_trans_memb_list,
2346 &instance->my_trans_memb_entries);
2347
2348 for (i = 0; i < instance->my_trans_memb_entries; i++) {
2349 log_printf (instance->totemsrp_log_level_debug,
2350 "TRANS [%d] member %s:", i, totemip_print (&instance->my_trans_memb_list[i].addr[0]));
2351 }
2352 for (i = 0; i < instance->my_new_memb_entries; i++) {
2353 log_printf (instance->totemsrp_log_level_debug,
2354 "position [%d] member %s:", i, totemip_print (&addr[i].addr[0]));
2355 log_printf (instance->totemsrp_log_level_debug,
2356 "previous ring seq %llx rep %s",
2357 memb_list[i].ring_id.seq,
2358 totemip_print (&memb_list[i].ring_id.rep));
2359
2360 log_printf (instance->totemsrp_log_level_debug,
2361 "aru %x high delivered %x received flag %d",
2362 memb_list[i].aru,
2363 memb_list[i].high_delivered,
2364 memb_list[i].received_flg);
2365
2366 // assert (totemip_print (&memb_list[i].ring_id.rep) != 0);
2367 }
2368 /*
2369 * Determine if any received flag is false
2370 */
2371 for (i = 0; i < commit_token->addr_entries; i++) {
2372 if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2373 instance->my_trans_memb_list, instance->my_trans_memb_entries) &&
2374
2375 memb_list[i].received_flg == 0) {
2376 instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
2377 memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
2378 sizeof (struct srp_addr) * instance->my_trans_memb_entries);
2379 local_received_flg = 0;
2380 break;
2381 }
2382 }
2383 if (local_received_flg == 1) {
2384 goto no_originate;
2385 } /* Else originate messages if we should */
2386
2387 /*
2388 * Calculate my_low_ring_aru, instance->my_high_ring_delivered for the transitional membership
2389 */
2390 for (i = 0; i < commit_token->addr_entries; i++) {
2391 if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2392 instance->my_deliver_memb_list,
2393 instance->my_deliver_memb_entries) &&
2394
2395 memcmp (&instance->my_old_ring_id,
2396 &memb_list[i].ring_id,
2397 sizeof (struct memb_ring_id)) == 0) {
2398
2399 if (sq_lt_compare (memb_list[i].aru, low_ring_aru)) {
2400
2401 low_ring_aru = memb_list[i].aru;
2402 }
2403 if (sq_lt_compare (instance->my_high_ring_delivered, memb_list[i].high_delivered)) {
2404 instance->my_high_ring_delivered = memb_list[i].high_delivered;
2405 }
2406 }
2407 }
2408
2409 /*
2410 * Copy all old ring messages to instance->retrans_message_queue
2411 */
2412 range = instance->old_ring_state_high_seq_received - low_ring_aru;
2413 if (range == 0) {
2414 /*
2415 * No messages to copy
2416 */
2417 goto no_originate;
2418 }
2419 assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2420
2421 log_printf (instance->totemsrp_log_level_debug,
2422 "copying all old ring messages from %x-%x.",
2423 low_ring_aru + 1, instance->old_ring_state_high_seq_received);
2424
2425 for (i = 1; i <= range; i++) {
2426 struct sort_queue_item *sort_queue_item;
2427 struct message_item message_item;
2428 void *ptr;
2429 int res;
2430
2431 res = sq_item_get (&instance->regular_sort_queue,
2432 low_ring_aru + i, &ptr);
2433 if (res != 0) {
2434 continue;
2435 }
2436 sort_queue_item = ptr;
2437 messages_originated++;
2438 memset (&message_item, 0, sizeof (struct message_item));
2439 // TODO LEAK
2440 message_item.mcast = totemsrp_buffer_alloc (instance);
2441 assert (message_item.mcast);
2442 message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
2443 srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
2444 message_item.mcast->header.encapsulated = MESSAGE_ENCAPSULATED;
2445 message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
2446 assert (message_item.mcast->header.nodeid);
2447 message_item.mcast->header.endian_detector = ENDIAN_LOCAL;
2448 memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
2449 sizeof (struct memb_ring_id));
2450 message_item.msg_len = sort_queue_item->msg_len + sizeof (struct mcast);
2451 memcpy (((char *)message_item.mcast) + sizeof (struct mcast),
2452 sort_queue_item->mcast,
2453 sort_queue_item->msg_len);
2454 cs_queue_item_add (&instance->retrans_message_queue, &message_item);
2455 }
2456 log_printf (instance->totemsrp_log_level_debug,
2457 "Originated %d messages in RECOVERY.", messages_originated);
2458 goto originated;
2459
2460 no_originate:
2461 log_printf (instance->totemsrp_log_level_debug,
2462 "Did not need to originate any messages in recovery.");
2463
2464 originated:
2465 instance->my_aru = SEQNO_START_MSG;
2466 instance->my_aru_count = 0;
2467 instance->my_seq_unchanged = 0;
2468 instance->my_high_seq_received = SEQNO_START_MSG;
2469 instance->my_install_seq = SEQNO_START_MSG;
2470 instance->last_released = SEQNO_START_MSG;
2471
2472 reset_token_timeout (instance); // REVIEWED
2473 reset_token_retransmit_timeout (instance); // REVIEWED
2474
2475 instance->memb_state = MEMB_STATE_RECOVERY;
2476 instance->stats.recovery_entered++;
2477 instance->stats.continuous_gather = 0;
2478
2479 return;
2480 }
2481
totemsrp_event_signal(void * srp_context,enum totem_event_type type,int value)2482 void totemsrp_event_signal (void *srp_context, enum totem_event_type type, int value)
2483 {
2484 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2485
2486 token_hold_cancel_send (instance);
2487
2488 return;
2489 }
2490
totemsrp_mcast(void * srp_context,struct iovec * iovec,unsigned int iov_len,int guarantee)2491 int totemsrp_mcast (
2492 void *srp_context,
2493 struct iovec *iovec,
2494 unsigned int iov_len,
2495 int guarantee)
2496 {
2497 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2498 int i;
2499 struct message_item message_item;
2500 char *addr;
2501 unsigned int addr_idx;
2502 struct cs_queue *queue_use;
2503
2504 if (instance->waiting_trans_ack) {
2505 queue_use = &instance->new_message_queue_trans;
2506 } else {
2507 queue_use = &instance->new_message_queue;
2508 }
2509
2510 if (cs_queue_is_full (queue_use)) {
2511 log_printf (instance->totemsrp_log_level_debug, "queue full");
2512 return (-1);
2513 }
2514
2515 memset (&message_item, 0, sizeof (struct message_item));
2516
2517 /*
2518 * Allocate pending item
2519 */
2520 message_item.mcast = totemsrp_buffer_alloc (instance);
2521 if (message_item.mcast == 0) {
2522 goto error_mcast;
2523 }
2524
2525 /*
2526 * Set mcast header
2527 */
2528 memset(message_item.mcast, 0, sizeof (struct mcast));
2529 message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
2530 message_item.mcast->header.endian_detector = ENDIAN_LOCAL;
2531 message_item.mcast->header.encapsulated = MESSAGE_NOT_ENCAPSULATED;
2532 message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
2533 assert (message_item.mcast->header.nodeid);
2534
2535 message_item.mcast->guarantee = guarantee;
2536 srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
2537
2538 addr = (char *)message_item.mcast;
2539 addr_idx = sizeof (struct mcast);
2540 for (i = 0; i < iov_len; i++) {
2541 memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2542 addr_idx += iovec[i].iov_len;
2543 }
2544
2545 message_item.msg_len = addr_idx;
2546
2547 log_printf (instance->totemsrp_log_level_trace, "mcasted message added to pending queue");
2548 instance->stats.mcast_tx++;
2549 cs_queue_item_add (queue_use, &message_item);
2550
2551 return (0);
2552
2553 error_mcast:
2554 return (-1);
2555 }
2556
2557 /*
2558 * Determine if there is room to queue a new message
2559 */
totemsrp_avail(void * srp_context)2560 int totemsrp_avail (void *srp_context)
2561 {
2562 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2563 int avail;
2564 struct cs_queue *queue_use;
2565
2566 if (instance->waiting_trans_ack) {
2567 queue_use = &instance->new_message_queue_trans;
2568 } else {
2569 queue_use = &instance->new_message_queue;
2570 }
2571 cs_queue_avail (queue_use, &avail);
2572
2573 return (avail);
2574 }
2575
2576 /*
2577 * ORF Token Management
2578 */
2579 /*
2580 * Recast message to mcast group if it is available
2581 */
orf_token_remcast(struct totemsrp_instance * instance,int seq)2582 static int orf_token_remcast (
2583 struct totemsrp_instance *instance,
2584 int seq)
2585 {
2586 struct sort_queue_item *sort_queue_item;
2587 int res;
2588 void *ptr;
2589
2590 struct sq *sort_queue;
2591
2592 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2593 sort_queue = &instance->recovery_sort_queue;
2594 } else {
2595 sort_queue = &instance->regular_sort_queue;
2596 }
2597
2598 res = sq_in_range (sort_queue, seq);
2599 if (res == 0) {
2600 log_printf (instance->totemsrp_log_level_debug, "sq not in range");
2601 return (-1);
2602 }
2603
2604 /*
2605 * Get RTR item at seq, if not available, return
2606 */
2607 res = sq_item_get (sort_queue, seq, &ptr);
2608 if (res != 0) {
2609 return -1;
2610 }
2611
2612 sort_queue_item = ptr;
2613
2614 totemrrp_mcast_noflush_send (
2615 instance->totemrrp_context,
2616 sort_queue_item->mcast,
2617 sort_queue_item->msg_len);
2618
2619 return (0);
2620 }
2621
2622
2623 /*
2624 * Free all freeable messages from ring
2625 */
messages_free(struct totemsrp_instance * instance,unsigned int token_aru)2626 static void messages_free (
2627 struct totemsrp_instance *instance,
2628 unsigned int token_aru)
2629 {
2630 struct sort_queue_item *regular_message;
2631 unsigned int i;
2632 int res;
2633 int log_release = 0;
2634 unsigned int release_to;
2635 unsigned int range = 0;
2636
2637 release_to = token_aru;
2638 if (sq_lt_compare (instance->my_last_aru, release_to)) {
2639 release_to = instance->my_last_aru;
2640 }
2641 if (sq_lt_compare (instance->my_high_delivered, release_to)) {
2642 release_to = instance->my_high_delivered;
2643 }
2644
2645 /*
2646 * Ensure we dont try release before an already released point
2647 */
2648 if (sq_lt_compare (release_to, instance->last_released)) {
2649 return;
2650 }
2651
2652 range = release_to - instance->last_released;
2653 assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2654
2655 /*
2656 * Release retransmit list items if group aru indicates they are transmitted
2657 */
2658 for (i = 1; i <= range; i++) {
2659 void *ptr;
2660
2661 res = sq_item_get (&instance->regular_sort_queue,
2662 instance->last_released + i, &ptr);
2663 if (res == 0) {
2664 regular_message = ptr;
2665 totemsrp_buffer_release (instance, regular_message->mcast);
2666 }
2667 sq_items_release (&instance->regular_sort_queue,
2668 instance->last_released + i);
2669
2670 log_release = 1;
2671 }
2672 instance->last_released += range;
2673
2674 if (log_release) {
2675 log_printf (instance->totemsrp_log_level_trace,
2676 "releasing messages up to and including %x", release_to);
2677 }
2678 }
2679
update_aru(struct totemsrp_instance * instance)2680 static void update_aru (
2681 struct totemsrp_instance *instance)
2682 {
2683 unsigned int i;
2684 int res;
2685 struct sq *sort_queue;
2686 unsigned int range;
2687 unsigned int my_aru_saved = 0;
2688
2689 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2690 sort_queue = &instance->recovery_sort_queue;
2691 } else {
2692 sort_queue = &instance->regular_sort_queue;
2693 }
2694
2695 range = instance->my_high_seq_received - instance->my_aru;
2696
2697 my_aru_saved = instance->my_aru;
2698 for (i = 1; i <= range; i++) {
2699
2700 void *ptr;
2701
2702 res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2703 /*
2704 * If hole, stop updating aru
2705 */
2706 if (res != 0) {
2707 break;
2708 }
2709 }
2710 instance->my_aru += i - 1;
2711 }
2712
2713 /*
2714 * Multicasts pending messages onto the ring (requires orf_token possession)
2715 */
orf_token_mcast(struct totemsrp_instance * instance,struct orf_token * token,int fcc_mcasts_allowed)2716 static int orf_token_mcast (
2717 struct totemsrp_instance *instance,
2718 struct orf_token *token,
2719 int fcc_mcasts_allowed)
2720 {
2721 struct message_item *message_item = 0;
2722 struct cs_queue *mcast_queue;
2723 struct sq *sort_queue;
2724 struct sort_queue_item sort_queue_item;
2725 struct mcast *mcast;
2726 unsigned int fcc_mcast_current;
2727
2728 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2729 mcast_queue = &instance->retrans_message_queue;
2730 sort_queue = &instance->recovery_sort_queue;
2731 reset_token_retransmit_timeout (instance); // REVIEWED
2732 } else {
2733 if (instance->waiting_trans_ack) {
2734 mcast_queue = &instance->new_message_queue_trans;
2735 } else {
2736 mcast_queue = &instance->new_message_queue;
2737 }
2738
2739 sort_queue = &instance->regular_sort_queue;
2740 }
2741
2742 for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2743 if (cs_queue_is_empty (mcast_queue)) {
2744 break;
2745 }
2746 message_item = (struct message_item *)cs_queue_item_get (mcast_queue);
2747
2748 message_item->mcast->seq = ++token->seq;
2749 message_item->mcast->this_seqno = instance->global_seqno++;
2750
2751 /*
2752 * Build IO vector
2753 */
2754 memset (&sort_queue_item, 0, sizeof (struct sort_queue_item));
2755 sort_queue_item.mcast = message_item->mcast;
2756 sort_queue_item.msg_len = message_item->msg_len;
2757
2758 mcast = sort_queue_item.mcast;
2759
2760 memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
2761
2762 /*
2763 * Add message to retransmit queue
2764 */
2765 sq_item_add (sort_queue, &sort_queue_item, message_item->mcast->seq);
2766
2767 totemrrp_mcast_noflush_send (
2768 instance->totemrrp_context,
2769 message_item->mcast,
2770 message_item->msg_len);
2771
2772 /*
2773 * Delete item from pending queue
2774 */
2775 cs_queue_item_remove (mcast_queue);
2776
2777 /*
2778 * If messages mcasted, deliver any new messages to totempg
2779 */
2780 instance->my_high_seq_received = token->seq;
2781 }
2782
2783 update_aru (instance);
2784
2785 /*
2786 * Return 1 if more messages are available for single node clusters
2787 */
2788 return (fcc_mcast_current);
2789 }
2790
2791 /*
2792 * Remulticasts messages in orf_token's retransmit list (requires orf_token)
2793 * Modify's orf_token's rtr to include retransmits required by this process
2794 */
orf_token_rtr(struct totemsrp_instance * instance,struct orf_token * orf_token,unsigned int * fcc_allowed)2795 static int orf_token_rtr (
2796 struct totemsrp_instance *instance,
2797 struct orf_token *orf_token,
2798 unsigned int *fcc_allowed)
2799 {
2800 unsigned int res;
2801 unsigned int i, j;
2802 unsigned int found;
2803 struct sq *sort_queue;
2804 struct rtr_item *rtr_list;
2805 unsigned int range = 0;
2806 char retransmit_msg[1024];
2807 char value[64];
2808
2809 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2810 sort_queue = &instance->recovery_sort_queue;
2811 } else {
2812 sort_queue = &instance->regular_sort_queue;
2813 }
2814
2815 rtr_list = &orf_token->rtr_list[0];
2816
2817 strcpy (retransmit_msg, "Retransmit List: ");
2818 if (orf_token->rtr_list_entries) {
2819 log_printf (instance->totemsrp_log_level_debug,
2820 "Retransmit List %d", orf_token->rtr_list_entries);
2821 for (i = 0; i < orf_token->rtr_list_entries; i++) {
2822 sprintf (value, "%x ", rtr_list[i].seq);
2823 strcat (retransmit_msg, value);
2824 }
2825 strcat (retransmit_msg, "");
2826 log_printf (instance->totemsrp_log_level_notice,
2827 "%s", retransmit_msg);
2828 }
2829
2830 /*
2831 * Retransmit messages on orf_token's RTR list from RTR queue
2832 */
2833 for (instance->fcc_remcast_current = 0, i = 0;
2834 instance->fcc_remcast_current < *fcc_allowed && i < orf_token->rtr_list_entries;) {
2835
2836 /*
2837 * If this retransmit request isn't from this configuration,
2838 * try next rtr entry
2839 */
2840 if (memcmp (&rtr_list[i].ring_id, &instance->my_ring_id,
2841 sizeof (struct memb_ring_id)) != 0) {
2842
2843 i += 1;
2844 continue;
2845 }
2846
2847 res = orf_token_remcast (instance, rtr_list[i].seq);
2848 if (res == 0) {
2849 /*
2850 * Multicasted message, so no need to copy to new retransmit list
2851 */
2852 orf_token->rtr_list_entries -= 1;
2853 assert (orf_token->rtr_list_entries >= 0);
2854 memmove (&rtr_list[i], &rtr_list[i + 1],
2855 sizeof (struct rtr_item) * (orf_token->rtr_list_entries - i));
2856
2857 instance->stats.mcast_retx++;
2858 instance->fcc_remcast_current++;
2859 } else {
2860 i += 1;
2861 }
2862 }
2863 *fcc_allowed = *fcc_allowed - instance->fcc_remcast_current;
2864
2865 /*
2866 * Add messages to retransmit to RTR list
2867 * but only retry if there is room in the retransmit list
2868 */
2869
2870 range = orf_token->seq - instance->my_aru;
2871 assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2872
2873 for (i = 1; (orf_token->rtr_list_entries < RETRANSMIT_ENTRIES_MAX) &&
2874 (i <= range); i++) {
2875
2876 /*
2877 * Ensure message is within the sort queue range
2878 */
2879 res = sq_in_range (sort_queue, instance->my_aru + i);
2880 if (res == 0) {
2881 break;
2882 }
2883
2884 /*
2885 * Find if a message is missing from this processor
2886 */
2887 res = sq_item_inuse (sort_queue, instance->my_aru + i);
2888 if (res == 0) {
2889 /*
2890 * Determine how many times we have missed receiving
2891 * this sequence number. sq_item_miss_count increments
2892 * a counter for the sequence number. The miss count
2893 * will be returned and compared. This allows time for
2894 * delayed multicast messages to be received before
2895 * declaring the message is missing and requesting a
2896 * retransmit.
2897 */
2898 res = sq_item_miss_count (sort_queue, instance->my_aru + i);
2899 if (res < instance->totem_config->miss_count_const) {
2900 continue;
2901 }
2902
2903 /*
2904 * Determine if missing message is already in retransmit list
2905 */
2906 found = 0;
2907 for (j = 0; j < orf_token->rtr_list_entries; j++) {
2908 if (instance->my_aru + i == rtr_list[j].seq) {
2909 found = 1;
2910 }
2911 }
2912 if (found == 0) {
2913 /*
2914 * Missing message not found in current retransmit list so add it
2915 */
2916 memcpy (&rtr_list[orf_token->rtr_list_entries].ring_id,
2917 &instance->my_ring_id, sizeof (struct memb_ring_id));
2918 rtr_list[orf_token->rtr_list_entries].seq = instance->my_aru + i;
2919 orf_token->rtr_list_entries++;
2920 }
2921 }
2922 }
2923 return (instance->fcc_remcast_current);
2924 }
2925
token_retransmit(struct totemsrp_instance * instance)2926 static void token_retransmit (struct totemsrp_instance *instance)
2927 {
2928 totemrrp_token_send (instance->totemrrp_context,
2929 instance->orf_token_retransmit,
2930 instance->orf_token_retransmit_size);
2931 }
2932
2933 /*
2934 * Retransmit the regular token if no mcast or token has
2935 * been received in retransmit token period retransmit
2936 * the token to the next processor
2937 */
timer_function_token_retransmit_timeout(void * data)2938 static void timer_function_token_retransmit_timeout (void *data)
2939 {
2940 struct totemsrp_instance *instance = data;
2941
2942 switch (instance->memb_state) {
2943 case MEMB_STATE_GATHER:
2944 break;
2945 case MEMB_STATE_COMMIT:
2946 case MEMB_STATE_OPERATIONAL:
2947 case MEMB_STATE_RECOVERY:
2948 token_retransmit (instance);
2949 reset_token_retransmit_timeout (instance); // REVIEWED
2950 break;
2951 }
2952 }
2953
timer_function_token_hold_retransmit_timeout(void * data)2954 static void timer_function_token_hold_retransmit_timeout (void *data)
2955 {
2956 struct totemsrp_instance *instance = data;
2957
2958 switch (instance->memb_state) {
2959 case MEMB_STATE_GATHER:
2960 break;
2961 case MEMB_STATE_COMMIT:
2962 break;
2963 case MEMB_STATE_OPERATIONAL:
2964 case MEMB_STATE_RECOVERY:
2965 token_retransmit (instance);
2966 break;
2967 }
2968 }
2969
timer_function_merge_detect_timeout(void * data)2970 static void timer_function_merge_detect_timeout(void *data)
2971 {
2972 struct totemsrp_instance *instance = data;
2973
2974 instance->my_merge_detect_timeout_outstanding = 0;
2975
2976 switch (instance->memb_state) {
2977 case MEMB_STATE_OPERATIONAL:
2978 if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0])) {
2979 memb_merge_detect_transmit (instance);
2980 }
2981 break;
2982 case MEMB_STATE_GATHER:
2983 case MEMB_STATE_COMMIT:
2984 case MEMB_STATE_RECOVERY:
2985 break;
2986 }
2987 }
2988
2989 /*
2990 * Send orf_token to next member (requires orf_token)
2991 */
token_send(struct totemsrp_instance * instance,struct orf_token * orf_token,int forward_token)2992 static int token_send (
2993 struct totemsrp_instance *instance,
2994 struct orf_token *orf_token,
2995 int forward_token)
2996 {
2997 int res = 0;
2998 unsigned int orf_token_size;
2999
3000 orf_token_size = sizeof (struct orf_token) +
3001 (orf_token->rtr_list_entries * sizeof (struct rtr_item));
3002
3003 orf_token->header.nodeid = instance->my_id.addr[0].nodeid;
3004 memcpy (instance->orf_token_retransmit, orf_token, orf_token_size);
3005 instance->orf_token_retransmit_size = orf_token_size;
3006 assert (orf_token->header.nodeid);
3007
3008 if (forward_token == 0) {
3009 return (0);
3010 }
3011
3012 totemrrp_token_send (instance->totemrrp_context,
3013 orf_token,
3014 orf_token_size);
3015
3016 return (res);
3017 }
3018
token_hold_cancel_send(struct totemsrp_instance * instance)3019 static int token_hold_cancel_send (struct totemsrp_instance *instance)
3020 {
3021 struct token_hold_cancel token_hold_cancel;
3022
3023 /*
3024 * Only cancel if the token is currently held
3025 */
3026 if (instance->my_token_held == 0) {
3027 return (0);
3028 }
3029 instance->my_token_held = 0;
3030
3031 /*
3032 * Build message
3033 */
3034 token_hold_cancel.header.type = MESSAGE_TYPE_TOKEN_HOLD_CANCEL;
3035 token_hold_cancel.header.endian_detector = ENDIAN_LOCAL;
3036 token_hold_cancel.header.encapsulated = 0;
3037 token_hold_cancel.header.nodeid = instance->my_id.addr[0].nodeid;
3038 memcpy (&token_hold_cancel.ring_id, &instance->my_ring_id,
3039 sizeof (struct memb_ring_id));
3040 assert (token_hold_cancel.header.nodeid);
3041
3042 instance->stats.token_hold_cancel_tx++;
3043
3044 totemrrp_mcast_flush_send (instance->totemrrp_context, &token_hold_cancel,
3045 sizeof (struct token_hold_cancel));
3046
3047 return (0);
3048 }
3049
orf_token_send_initial(struct totemsrp_instance * instance)3050 static int orf_token_send_initial (struct totemsrp_instance *instance)
3051 {
3052 struct orf_token orf_token;
3053 int res;
3054
3055 orf_token.header.type = MESSAGE_TYPE_ORF_TOKEN;
3056 orf_token.header.endian_detector = ENDIAN_LOCAL;
3057 orf_token.header.encapsulated = 0;
3058 orf_token.header.nodeid = instance->my_id.addr[0].nodeid;
3059 assert (orf_token.header.nodeid);
3060 orf_token.seq = SEQNO_START_MSG;
3061 orf_token.token_seq = SEQNO_START_TOKEN;
3062 orf_token.retrans_flg = 1;
3063 instance->my_set_retrans_flg = 1;
3064 instance->stats.orf_token_tx++;
3065
3066 if (cs_queue_is_empty (&instance->retrans_message_queue) == 1) {
3067 orf_token.retrans_flg = 0;
3068 instance->my_set_retrans_flg = 0;
3069 } else {
3070 orf_token.retrans_flg = 1;
3071 instance->my_set_retrans_flg = 1;
3072 }
3073
3074 orf_token.aru = 0;
3075 orf_token.aru = SEQNO_START_MSG - 1;
3076 orf_token.aru_addr = instance->my_id.addr[0].nodeid;
3077
3078 memcpy (&orf_token.ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
3079 orf_token.fcc = 0;
3080 orf_token.backlog = 0;
3081
3082 orf_token.rtr_list_entries = 0;
3083
3084 res = token_send (instance, &orf_token, 1);
3085
3086 return (res);
3087 }
3088
memb_state_commit_token_update(struct totemsrp_instance * instance)3089 static void memb_state_commit_token_update (
3090 struct totemsrp_instance *instance)
3091 {
3092 struct srp_addr *addr;
3093 struct memb_commit_token_memb_entry *memb_list;
3094 unsigned int high_aru;
3095 unsigned int i;
3096
3097 addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3098 memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3099
3100 memcpy (instance->my_new_memb_list, addr,
3101 sizeof (struct srp_addr) * instance->commit_token->addr_entries);
3102
3103 instance->my_new_memb_entries = instance->commit_token->addr_entries;
3104
3105 memcpy (&memb_list[instance->commit_token->memb_index].ring_id,
3106 &instance->my_old_ring_id, sizeof (struct memb_ring_id));
3107
3108 memb_list[instance->commit_token->memb_index].aru = instance->old_ring_state_aru;
3109 /*
3110 * TODO high delivered is really instance->my_aru, but with safe this
3111 * could change?
3112 */
3113 instance->my_received_flg =
3114 (instance->my_aru == instance->my_high_seq_received);
3115
3116 memb_list[instance->commit_token->memb_index].received_flg = instance->my_received_flg;
3117
3118 memb_list[instance->commit_token->memb_index].high_delivered = instance->my_high_delivered;
3119 /*
3120 * find high aru up to current memb_index for all matching ring ids
3121 * if any ring id matching memb_index has aru less then high aru set
3122 * received flag for that entry to false
3123 */
3124 high_aru = memb_list[instance->commit_token->memb_index].aru;
3125 for (i = 0; i <= instance->commit_token->memb_index; i++) {
3126 if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3127 &memb_list[i].ring_id,
3128 sizeof (struct memb_ring_id)) == 0) {
3129
3130 if (sq_lt_compare (high_aru, memb_list[i].aru)) {
3131 high_aru = memb_list[i].aru;
3132 }
3133 }
3134 }
3135
3136 for (i = 0; i <= instance->commit_token->memb_index; i++) {
3137 if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3138 &memb_list[i].ring_id,
3139 sizeof (struct memb_ring_id)) == 0) {
3140
3141 if (sq_lt_compare (memb_list[i].aru, high_aru)) {
3142 memb_list[i].received_flg = 0;
3143 if (i == instance->commit_token->memb_index) {
3144 instance->my_received_flg = 0;
3145 }
3146 }
3147 }
3148 }
3149
3150 instance->commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
3151 instance->commit_token->memb_index += 1;
3152 assert (instance->commit_token->memb_index <= instance->commit_token->addr_entries);
3153 assert (instance->commit_token->header.nodeid);
3154 }
3155
memb_state_commit_token_target_set(struct totemsrp_instance * instance)3156 static void memb_state_commit_token_target_set (
3157 struct totemsrp_instance *instance)
3158 {
3159 struct srp_addr *addr;
3160 unsigned int i;
3161
3162 addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3163
3164 for (i = 0; i < instance->totem_config->interface_count; i++) {
3165 totemrrp_token_target_set (
3166 instance->totemrrp_context,
3167 &addr[instance->commit_token->memb_index %
3168 instance->commit_token->addr_entries].addr[i],
3169 i);
3170 }
3171 }
3172
memb_state_commit_token_send_recovery(struct totemsrp_instance * instance,struct memb_commit_token * commit_token)3173 static int memb_state_commit_token_send_recovery (
3174 struct totemsrp_instance *instance,
3175 struct memb_commit_token *commit_token)
3176 {
3177 unsigned int commit_token_size;
3178
3179 commit_token->token_seq++;
3180 commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
3181 commit_token_size = sizeof (struct memb_commit_token) +
3182 ((sizeof (struct srp_addr) +
3183 sizeof (struct memb_commit_token_memb_entry)) * commit_token->addr_entries);
3184 /*
3185 * Make a copy for retransmission if necessary
3186 */
3187 memcpy (instance->orf_token_retransmit, commit_token, commit_token_size);
3188 instance->orf_token_retransmit_size = commit_token_size;
3189
3190 instance->stats.memb_commit_token_tx++;
3191
3192 totemrrp_token_send (instance->totemrrp_context,
3193 commit_token,
3194 commit_token_size);
3195
3196 /*
3197 * Request retransmission of the commit token in case it is lost
3198 */
3199 reset_token_retransmit_timeout (instance);
3200 return (0);
3201 }
3202
memb_state_commit_token_send(struct totemsrp_instance * instance)3203 static int memb_state_commit_token_send (
3204 struct totemsrp_instance *instance)
3205 {
3206 unsigned int commit_token_size;
3207
3208 instance->commit_token->token_seq++;
3209 instance->commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
3210 commit_token_size = sizeof (struct memb_commit_token) +
3211 ((sizeof (struct srp_addr) +
3212 sizeof (struct memb_commit_token_memb_entry)) * instance->commit_token->addr_entries);
3213 /*
3214 * Make a copy for retransmission if necessary
3215 */
3216 memcpy (instance->orf_token_retransmit, instance->commit_token, commit_token_size);
3217 instance->orf_token_retransmit_size = commit_token_size;
3218
3219 instance->stats.memb_commit_token_tx++;
3220
3221 totemrrp_token_send (instance->totemrrp_context,
3222 instance->commit_token,
3223 commit_token_size);
3224
3225 /*
3226 * Request retransmission of the commit token in case it is lost
3227 */
3228 reset_token_retransmit_timeout (instance);
3229 return (0);
3230 }
3231
3232
memb_lowest_in_config(struct totemsrp_instance * instance)3233 static int memb_lowest_in_config (struct totemsrp_instance *instance)
3234 {
3235 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3236 int token_memb_entries = 0;
3237 int i;
3238 struct totem_ip_address *lowest_addr;
3239
3240 memb_set_subtract (token_memb, &token_memb_entries,
3241 instance->my_proc_list, instance->my_proc_list_entries,
3242 instance->my_failed_list, instance->my_failed_list_entries);
3243
3244 /*
3245 * find representative by searching for smallest identifier
3246 */
3247 assert(token_memb_entries > 0);
3248
3249 lowest_addr = &token_memb[0].addr[0];
3250 for (i = 1; i < token_memb_entries; i++) {
3251 if (totemip_compare(lowest_addr, &token_memb[i].addr[0]) > 0) {
3252 totemip_copy (lowest_addr, &token_memb[i].addr[0]);
3253 }
3254 }
3255 return (totemip_compare (lowest_addr, &instance->my_id.addr[0]) == 0);
3256 }
3257
srp_addr_compare(const void * a,const void * b)3258 static int srp_addr_compare (const void *a, const void *b)
3259 {
3260 const struct srp_addr *srp_a = (const struct srp_addr *)a;
3261 const struct srp_addr *srp_b = (const struct srp_addr *)b;
3262
3263 return (totemip_compare (&srp_a->addr[0], &srp_b->addr[0]));
3264 }
3265
memb_state_commit_token_create(struct totemsrp_instance * instance)3266 static void memb_state_commit_token_create (
3267 struct totemsrp_instance *instance)
3268 {
3269 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3270 struct srp_addr *addr;
3271 struct memb_commit_token_memb_entry *memb_list;
3272 int token_memb_entries = 0;
3273
3274 log_printf (instance->totemsrp_log_level_debug,
3275 "Creating commit token because I am the rep.");
3276
3277 memb_set_subtract (token_memb, &token_memb_entries,
3278 instance->my_proc_list, instance->my_proc_list_entries,
3279 instance->my_failed_list, instance->my_failed_list_entries);
3280
3281 memset (instance->commit_token, 0, sizeof (struct memb_commit_token));
3282 instance->commit_token->header.type = MESSAGE_TYPE_MEMB_COMMIT_TOKEN;
3283 instance->commit_token->header.endian_detector = ENDIAN_LOCAL;
3284 instance->commit_token->header.encapsulated = 0;
3285 instance->commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
3286 assert (instance->commit_token->header.nodeid);
3287
3288 totemip_copy(&instance->commit_token->ring_id.rep, &instance->my_id.addr[0]);
3289
3290 instance->commit_token->ring_id.seq = instance->token_ring_id_seq + 4;
3291
3292 /*
3293 * This qsort is necessary to ensure the commit token traverses
3294 * the ring in the proper order
3295 */
3296 qsort (token_memb, token_memb_entries, sizeof (struct srp_addr),
3297 srp_addr_compare);
3298
3299 instance->commit_token->memb_index = 0;
3300 instance->commit_token->addr_entries = token_memb_entries;
3301
3302 addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3303 memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3304
3305 memcpy (addr, token_memb,
3306 token_memb_entries * sizeof (struct srp_addr));
3307 memset (memb_list, 0,
3308 sizeof (struct memb_commit_token_memb_entry) * token_memb_entries);
3309 }
3310
memb_join_message_send(struct totemsrp_instance * instance)3311 static void memb_join_message_send (struct totemsrp_instance *instance)
3312 {
3313 char memb_join_data[40000];
3314 struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3315 char *addr;
3316 unsigned int addr_idx;
3317 size_t msg_len;
3318
3319 memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
3320 memb_join->header.endian_detector = ENDIAN_LOCAL;
3321 memb_join->header.encapsulated = 0;
3322 memb_join->header.nodeid = instance->my_id.addr[0].nodeid;
3323 assert (memb_join->header.nodeid);
3324
3325 msg_len = sizeof(struct memb_join) +
3326 ((instance->my_proc_list_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3327
3328 if (msg_len > sizeof(memb_join_data)) {
3329 log_printf (instance->totemsrp_log_level_error,
3330 "memb_join_message too long. Ignoring message.");
3331
3332 return ;
3333 }
3334
3335 memb_join->ring_seq = instance->my_ring_id.seq;
3336 memb_join->proc_list_entries = instance->my_proc_list_entries;
3337 memb_join->failed_list_entries = instance->my_failed_list_entries;
3338 srp_addr_copy (&memb_join->system_from, &instance->my_id);
3339
3340 /*
3341 * This mess adds the joined and failed processor lists into the join
3342 * message
3343 */
3344 addr = (char *)memb_join;
3345 addr_idx = sizeof (struct memb_join);
3346 memcpy (&addr[addr_idx],
3347 instance->my_proc_list,
3348 instance->my_proc_list_entries *
3349 sizeof (struct srp_addr));
3350 addr_idx +=
3351 instance->my_proc_list_entries *
3352 sizeof (struct srp_addr);
3353 memcpy (&addr[addr_idx],
3354 instance->my_failed_list,
3355 instance->my_failed_list_entries *
3356 sizeof (struct srp_addr));
3357 addr_idx +=
3358 instance->my_failed_list_entries *
3359 sizeof (struct srp_addr);
3360
3361 if (instance->totem_config->send_join_timeout) {
3362 usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3363 }
3364
3365 instance->stats.memb_join_tx++;
3366
3367 totemrrp_mcast_flush_send (
3368 instance->totemrrp_context,
3369 memb_join,
3370 addr_idx);
3371 }
3372
memb_leave_message_send(struct totemsrp_instance * instance)3373 static void memb_leave_message_send (struct totemsrp_instance *instance)
3374 {
3375 char memb_join_data[40000];
3376 struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3377 char *addr;
3378 unsigned int addr_idx;
3379 int active_memb_entries;
3380 struct srp_addr active_memb[PROCESSOR_COUNT_MAX];
3381 size_t msg_len;
3382
3383 log_printf (instance->totemsrp_log_level_debug,
3384 "sending join/leave message");
3385
3386 /*
3387 * add us to the failed list, and remove us from
3388 * the members list
3389 */
3390 memb_set_merge(
3391 &instance->my_id, 1,
3392 instance->my_failed_list, &instance->my_failed_list_entries);
3393
3394 memb_set_subtract (active_memb, &active_memb_entries,
3395 instance->my_proc_list, instance->my_proc_list_entries,
3396 &instance->my_id, 1);
3397
3398 msg_len = sizeof(struct memb_join) +
3399 ((active_memb_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3400
3401 if (msg_len > sizeof(memb_join_data)) {
3402 log_printf (instance->totemsrp_log_level_error,
3403 "memb_leave message too long. Ignoring message.");
3404
3405 return ;
3406 }
3407
3408 memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
3409 memb_join->header.endian_detector = ENDIAN_LOCAL;
3410 memb_join->header.encapsulated = 0;
3411 memb_join->header.nodeid = LEAVE_DUMMY_NODEID;
3412
3413 memb_join->ring_seq = instance->my_ring_id.seq;
3414 memb_join->proc_list_entries = active_memb_entries;
3415 memb_join->failed_list_entries = instance->my_failed_list_entries;
3416 srp_addr_copy (&memb_join->system_from, &instance->my_id);
3417 memb_join->system_from.addr[0].nodeid = LEAVE_DUMMY_NODEID;
3418
3419 // TODO: CC Maybe use the actual join send routine.
3420 /*
3421 * This mess adds the joined and failed processor lists into the join
3422 * message
3423 */
3424 addr = (char *)memb_join;
3425 addr_idx = sizeof (struct memb_join);
3426 memcpy (&addr[addr_idx],
3427 active_memb,
3428 active_memb_entries *
3429 sizeof (struct srp_addr));
3430 addr_idx +=
3431 active_memb_entries *
3432 sizeof (struct srp_addr);
3433 memcpy (&addr[addr_idx],
3434 instance->my_failed_list,
3435 instance->my_failed_list_entries *
3436 sizeof (struct srp_addr));
3437 addr_idx +=
3438 instance->my_failed_list_entries *
3439 sizeof (struct srp_addr);
3440
3441
3442 if (instance->totem_config->send_join_timeout) {
3443 usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3444 }
3445 instance->stats.memb_join_tx++;
3446
3447 totemrrp_mcast_flush_send (
3448 instance->totemrrp_context,
3449 memb_join,
3450 addr_idx);
3451 }
3452
memb_merge_detect_transmit(struct totemsrp_instance * instance)3453 static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
3454 {
3455 struct memb_merge_detect memb_merge_detect;
3456
3457 memb_merge_detect.header.type = MESSAGE_TYPE_MEMB_MERGE_DETECT;
3458 memb_merge_detect.header.endian_detector = ENDIAN_LOCAL;
3459 memb_merge_detect.header.encapsulated = 0;
3460 memb_merge_detect.header.nodeid = instance->my_id.addr[0].nodeid;
3461 srp_addr_copy (&memb_merge_detect.system_from, &instance->my_id);
3462 memcpy (&memb_merge_detect.ring_id, &instance->my_ring_id,
3463 sizeof (struct memb_ring_id));
3464 assert (memb_merge_detect.header.nodeid);
3465
3466 instance->stats.memb_merge_detect_tx++;
3467 totemrrp_mcast_flush_send (instance->totemrrp_context,
3468 &memb_merge_detect,
3469 sizeof (struct memb_merge_detect));
3470 }
3471
memb_ring_id_set(struct totemsrp_instance * instance,const struct memb_ring_id * ring_id)3472 static void memb_ring_id_set (
3473 struct totemsrp_instance *instance,
3474 const struct memb_ring_id *ring_id)
3475 {
3476
3477 memcpy (&instance->my_ring_id, ring_id, sizeof (struct memb_ring_id));
3478 }
3479
totemsrp_callback_token_create(void * srp_context,void ** handle_out,enum totem_callback_token_type type,int delete,int (* callback_fn)(enum totem_callback_token_type type,const void *),const void * data)3480 int totemsrp_callback_token_create (
3481 void *srp_context,
3482 void **handle_out,
3483 enum totem_callback_token_type type,
3484 int delete,
3485 int (*callback_fn) (enum totem_callback_token_type type, const void *),
3486 const void *data)
3487 {
3488 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
3489 struct token_callback_instance *callback_handle;
3490
3491 token_hold_cancel_send (instance);
3492
3493 callback_handle = malloc (sizeof (struct token_callback_instance));
3494 if (callback_handle == 0) {
3495 return (-1);
3496 }
3497 *handle_out = (void *)callback_handle;
3498 list_init (&callback_handle->list);
3499 callback_handle->callback_fn = callback_fn;
3500 callback_handle->data = (void *) data;
3501 callback_handle->callback_type = type;
3502 callback_handle->delete = delete;
3503 switch (type) {
3504 case TOTEM_CALLBACK_TOKEN_RECEIVED:
3505 list_add (&callback_handle->list, &instance->token_callback_received_listhead);
3506 break;
3507 case TOTEM_CALLBACK_TOKEN_SENT:
3508 list_add (&callback_handle->list, &instance->token_callback_sent_listhead);
3509 break;
3510 }
3511
3512 return (0);
3513 }
3514
totemsrp_callback_token_destroy(void * srp_context,void ** handle_out)3515 void totemsrp_callback_token_destroy (void *srp_context, void **handle_out)
3516 {
3517 struct token_callback_instance *h;
3518
3519 if (*handle_out) {
3520 h = (struct token_callback_instance *)*handle_out;
3521 list_del (&h->list);
3522 free (h);
3523 h = NULL;
3524 *handle_out = 0;
3525 }
3526 }
3527
token_callbacks_execute(struct totemsrp_instance * instance,enum totem_callback_token_type type)3528 static void token_callbacks_execute (
3529 struct totemsrp_instance *instance,
3530 enum totem_callback_token_type type)
3531 {
3532 struct list_head *list;
3533 struct list_head *list_next;
3534 struct list_head *callback_listhead = 0;
3535 struct token_callback_instance *token_callback_instance;
3536 int res;
3537 int del;
3538
3539 switch (type) {
3540 case TOTEM_CALLBACK_TOKEN_RECEIVED:
3541 callback_listhead = &instance->token_callback_received_listhead;
3542 break;
3543 case TOTEM_CALLBACK_TOKEN_SENT:
3544 callback_listhead = &instance->token_callback_sent_listhead;
3545 break;
3546 default:
3547 assert (0);
3548 }
3549
3550 for (list = callback_listhead->next; list != callback_listhead;
3551 list = list_next) {
3552
3553 token_callback_instance = list_entry (list, struct token_callback_instance, list);
3554
3555 list_next = list->next;
3556 del = token_callback_instance->delete;
3557 if (del == 1) {
3558 list_del (list);
3559 }
3560
3561 res = token_callback_instance->callback_fn (
3562 token_callback_instance->callback_type,
3563 token_callback_instance->data);
3564 /*
3565 * This callback failed to execute, try it again on the next token
3566 */
3567 if (res == -1 && del == 1) {
3568 list_add (list, callback_listhead);
3569 } else if (del) {
3570 free (token_callback_instance);
3571 }
3572 }
3573 }
3574
3575 /*
3576 * Flow control functions
3577 */
backlog_get(struct totemsrp_instance * instance)3578 static unsigned int backlog_get (struct totemsrp_instance *instance)
3579 {
3580 unsigned int backlog = 0;
3581 struct cs_queue *queue_use = NULL;
3582
3583 if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
3584 if (instance->waiting_trans_ack) {
3585 queue_use = &instance->new_message_queue_trans;
3586 } else {
3587 queue_use = &instance->new_message_queue;
3588 }
3589 } else
3590 if (instance->memb_state == MEMB_STATE_RECOVERY) {
3591 queue_use = &instance->retrans_message_queue;
3592 }
3593
3594 if (queue_use != NULL) {
3595 backlog = cs_queue_used (queue_use);
3596 }
3597
3598 instance->stats.token[instance->stats.latest_token].backlog_calc = backlog;
3599 return (backlog);
3600 }
3601
fcc_calculate(struct totemsrp_instance * instance,struct orf_token * token)3602 static int fcc_calculate (
3603 struct totemsrp_instance *instance,
3604 struct orf_token *token)
3605 {
3606 unsigned int transmits_allowed;
3607 unsigned int backlog_calc;
3608
3609 transmits_allowed = instance->totem_config->max_messages;
3610
3611 if (transmits_allowed > instance->totem_config->window_size - token->fcc) {
3612 transmits_allowed = instance->totem_config->window_size - token->fcc;
3613 }
3614
3615 instance->my_cbl = backlog_get (instance);
3616
3617 /*
3618 * Only do backlog calculation if there is a backlog otherwise
3619 * we would result in div by zero
3620 */
3621 if (token->backlog + instance->my_cbl - instance->my_pbl) {
3622 backlog_calc = (instance->totem_config->window_size * instance->my_pbl) /
3623 (token->backlog + instance->my_cbl - instance->my_pbl);
3624 if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3625 transmits_allowed = backlog_calc;
3626 }
3627 }
3628
3629 return (transmits_allowed);
3630 }
3631
3632 /*
3633 * don't overflow the RTR sort queue
3634 */
fcc_rtr_limit(struct totemsrp_instance * instance,struct orf_token * token,unsigned int * transmits_allowed)3635 static void fcc_rtr_limit (
3636 struct totemsrp_instance *instance,
3637 struct orf_token *token,
3638 unsigned int *transmits_allowed)
3639 {
3640 int check = QUEUE_RTR_ITEMS_SIZE_MAX;
3641 check -= (*transmits_allowed + instance->totem_config->window_size);
3642 assert (check >= 0);
3643 if (sq_lt_compare (instance->last_released +
3644 QUEUE_RTR_ITEMS_SIZE_MAX - *transmits_allowed -
3645 instance->totem_config->window_size,
3646
3647 token->seq)) {
3648
3649 *transmits_allowed = 0;
3650 }
3651 }
3652
fcc_token_update(struct totemsrp_instance * instance,struct orf_token * token,unsigned int msgs_transmitted)3653 static void fcc_token_update (
3654 struct totemsrp_instance *instance,
3655 struct orf_token *token,
3656 unsigned int msgs_transmitted)
3657 {
3658 token->fcc += msgs_transmitted - instance->my_trc;
3659 token->backlog += instance->my_cbl - instance->my_pbl;
3660 instance->my_trc = msgs_transmitted;
3661 instance->my_pbl = instance->my_cbl;
3662 }
3663
3664 /*
3665 * Sanity checkers
3666 */
check_totemip_sanity(const struct totemsrp_instance * instance,const struct totem_ip_address * addr,int endian_conversion_needed)3667 static int check_totemip_sanity(
3668 const struct totemsrp_instance *instance,
3669 const struct totem_ip_address *addr,
3670 int endian_conversion_needed)
3671 {
3672 unsigned short family;
3673
3674 family = addr->family;
3675 if (endian_conversion_needed) {
3676 family = swab16(family);
3677 }
3678
3679 if (family != AF_INET && family != AF_INET6) {
3680 log_printf (instance->totemsrp_log_level_security,
3681 "Received message corrupted... ignoring.");
3682
3683 return (-1);
3684 }
3685
3686 return (0);
3687 }
3688
check_srpaddr_sanity(const struct totemsrp_instance * instance,const struct srp_addr * addr,int endian_conversion_needed)3689 static int check_srpaddr_sanity(
3690 const struct totemsrp_instance *instance,
3691 const struct srp_addr *addr,
3692 int endian_conversion_needed)
3693 {
3694 int i;
3695
3696 if (addr->no_addrs < 1 || addr->no_addrs > INTERFACE_MAX) {
3697 return (-1);
3698 }
3699
3700 for (i = 0; i < addr->no_addrs; i++) {
3701 if (i == 0 || addr->addr[i].family != 0) {
3702 if (check_totemip_sanity(instance, &addr->addr[i], endian_conversion_needed) == -1) {
3703 return (-1);
3704 }
3705 }
3706 }
3707
3708 return (0);
3709 }
3710
check_orf_token_sanity(const struct totemsrp_instance * instance,const void * msg,size_t msg_len,int endian_conversion_needed)3711 static int check_orf_token_sanity(
3712 const struct totemsrp_instance *instance,
3713 const void *msg,
3714 size_t msg_len,
3715 int endian_conversion_needed)
3716 {
3717 int rtr_entries;
3718 const struct orf_token *token = (const struct orf_token *)msg;
3719 size_t required_len;
3720 int i;
3721
3722 if (msg_len < sizeof(struct orf_token)) {
3723 log_printf (instance->totemsrp_log_level_security,
3724 "Received orf_token message is too short... ignoring.");
3725
3726 return (-1);
3727 }
3728
3729 if (check_totemip_sanity(instance, &token->ring_id.rep, endian_conversion_needed) == -1) {
3730 return (-1);
3731 }
3732
3733 if (endian_conversion_needed) {
3734 rtr_entries = swab32(token->rtr_list_entries);
3735 } else {
3736 rtr_entries = token->rtr_list_entries;
3737 }
3738
3739 required_len = sizeof(struct orf_token) + rtr_entries * sizeof(struct rtr_item);
3740 if (msg_len < required_len) {
3741 log_printf (instance->totemsrp_log_level_security,
3742 "Received orf_token message is too short... ignoring.");
3743
3744 return (-1);
3745 }
3746
3747 for (i = 0; i < rtr_entries; i++) {
3748 if (check_totemip_sanity(instance, &token->rtr_list[i].ring_id.rep,
3749 endian_conversion_needed) == -1) {
3750 return (-1);
3751 }
3752 }
3753
3754 return (0);
3755 }
3756
check_mcast_sanity(struct totemsrp_instance * instance,const void * msg,size_t msg_len,int endian_conversion_needed)3757 static int check_mcast_sanity(
3758 struct totemsrp_instance *instance,
3759 const void *msg,
3760 size_t msg_len,
3761 int endian_conversion_needed)
3762 {
3763 const struct mcast *mcast_msg = (const struct mcast *)msg;
3764
3765 if (msg_len < sizeof(struct mcast)) {
3766 log_printf (instance->totemsrp_log_level_security,
3767 "Received mcast message is too short... ignoring.");
3768
3769 return (-1);
3770 }
3771
3772 if ((check_totemip_sanity(instance, &mcast_msg->ring_id.rep, endian_conversion_needed) == -1) ||
3773 (check_srpaddr_sanity(instance, &mcast_msg->system_from, endian_conversion_needed) == -1)) {
3774 return (-1);
3775 }
3776
3777 return (0);
3778 }
3779
check_memb_merge_detect_sanity(struct totemsrp_instance * instance,const void * msg,size_t msg_len,int endian_conversion_needed)3780 static int check_memb_merge_detect_sanity(
3781 struct totemsrp_instance *instance,
3782 const void *msg,
3783 size_t msg_len,
3784 int endian_conversion_needed)
3785 {
3786 const struct memb_merge_detect *mmd_msg = (const struct memb_merge_detect *)msg;
3787
3788 if (msg_len < sizeof(struct memb_merge_detect)) {
3789 log_printf (instance->totemsrp_log_level_security,
3790 "Received memb_merge_detect message is too short... ignoring.");
3791
3792 return (-1);
3793 }
3794
3795 if ((check_totemip_sanity(instance, &mmd_msg->ring_id.rep, endian_conversion_needed) == -1) ||
3796 (check_srpaddr_sanity(instance, &mmd_msg->system_from, endian_conversion_needed) == -1)) {
3797 return (-1);
3798 }
3799
3800 return (0);
3801 }
3802
check_memb_join_sanity(struct totemsrp_instance * instance,const void * msg,size_t msg_len,int endian_conversion_needed)3803 static int check_memb_join_sanity(
3804 struct totemsrp_instance *instance,
3805 const void *msg,
3806 size_t msg_len,
3807 int endian_conversion_needed)
3808 {
3809 const struct memb_join *mj_msg = (const struct memb_join *)msg;
3810 unsigned int proc_list_entries;
3811 unsigned int failed_list_entries;
3812 size_t required_len;
3813 const struct srp_addr *proc_list;
3814 const struct srp_addr *failed_list;
3815 int i;
3816
3817 if (msg_len < sizeof(struct memb_join)) {
3818 log_printf (instance->totemsrp_log_level_security,
3819 "Received memb_join message is too short... ignoring.");
3820
3821 return (-1);
3822 }
3823
3824 if (check_srpaddr_sanity(instance, &mj_msg->system_from, endian_conversion_needed) == -1) {
3825 return (-1);
3826 }
3827
3828 proc_list_entries = mj_msg->proc_list_entries;
3829 failed_list_entries = mj_msg->failed_list_entries;
3830
3831 if (endian_conversion_needed) {
3832 proc_list_entries = swab32(proc_list_entries);
3833 failed_list_entries = swab32(failed_list_entries);
3834 }
3835
3836 required_len = sizeof(struct memb_join) + ((proc_list_entries + failed_list_entries) * sizeof(struct srp_addr));
3837 if (msg_len < required_len) {
3838 log_printf (instance->totemsrp_log_level_security,
3839 "Received memb_join message is too short... ignoring.");
3840
3841 return (-1);
3842 }
3843
3844 proc_list = (struct srp_addr *)mj_msg->end_of_memb_join;
3845 failed_list = proc_list + proc_list_entries;
3846
3847 for (i = 0; i < proc_list_entries; i++) {
3848 if (check_srpaddr_sanity(instance, &proc_list[i], endian_conversion_needed) == -1) {
3849 return (-1);
3850 }
3851 }
3852
3853 for (i = 0; i < failed_list_entries; i++) {
3854 if (check_srpaddr_sanity(instance, &failed_list[i], endian_conversion_needed) == -1) {
3855 return (-1);
3856 }
3857 }
3858
3859 return (0);
3860 }
3861
check_memb_commit_token_sanity(struct totemsrp_instance * instance,const void * msg,size_t msg_len,int endian_conversion_needed)3862 static int check_memb_commit_token_sanity(
3863 struct totemsrp_instance *instance,
3864 const void *msg,
3865 size_t msg_len,
3866 int endian_conversion_needed)
3867 {
3868 const struct memb_commit_token *mct_msg = (const struct memb_commit_token *)msg;
3869 unsigned int addr_entries;
3870 const struct srp_addr *addr;
3871 const struct memb_commit_token_memb_entry *memb_list;
3872 size_t required_len;
3873 int i;
3874
3875 if (msg_len < sizeof(struct memb_commit_token)) {
3876 log_printf (instance->totemsrp_log_level_security,
3877 "Received memb_commit_token message is too short... ignoring.");
3878
3879 return (0);
3880 }
3881
3882 if (check_totemip_sanity(instance, &mct_msg->ring_id.rep, endian_conversion_needed) == -1) {
3883 return (-1);
3884 }
3885
3886 addr_entries= mct_msg->addr_entries;
3887 if (endian_conversion_needed) {
3888 addr_entries = swab32(addr_entries);
3889 }
3890
3891 required_len = sizeof(struct memb_commit_token) +
3892 (addr_entries * (sizeof(struct srp_addr) + sizeof(struct memb_commit_token_memb_entry)));
3893 if (msg_len < required_len) {
3894 log_printf (instance->totemsrp_log_level_security,
3895 "Received memb_commit_token message is too short... ignoring.");
3896
3897 return (-1);
3898 }
3899
3900 addr = (const struct srp_addr *)mct_msg->end_of_commit_token;
3901 memb_list = (const struct memb_commit_token_memb_entry *)(addr + addr_entries);
3902
3903 for (i = 0; i < addr_entries; i++) {
3904 if (check_srpaddr_sanity(instance, &addr[i], endian_conversion_needed) == -1) {
3905 return (-1);
3906 }
3907
3908 if (memb_list[i].ring_id.rep.family != 0) {
3909 if (check_totemip_sanity(instance, &memb_list[i].ring_id.rep,
3910 endian_conversion_needed) == -1) {
3911 return (-1);
3912 }
3913 }
3914 }
3915
3916 return (0);
3917 }
3918
check_token_hold_cancel_sanity(struct totemsrp_instance * instance,const void * msg,size_t msg_len,int endian_conversion_needed)3919 static int check_token_hold_cancel_sanity(
3920 struct totemsrp_instance *instance,
3921 const void *msg,
3922 size_t msg_len,
3923 int endian_conversion_needed)
3924 {
3925 const struct token_hold_cancel *thc_msg = (const struct token_hold_cancel *)msg;
3926
3927 if (msg_len < sizeof(struct token_hold_cancel)) {
3928 log_printf (instance->totemsrp_log_level_security,
3929 "Received token_hold_cancel message is too short... ignoring.");
3930
3931 return (-1);
3932 }
3933
3934 if (check_totemip_sanity(instance, &thc_msg->ring_id.rep, endian_conversion_needed) == -1) {
3935 return (-1);
3936 }
3937
3938 return (0);
3939 }
3940
3941 /*
3942 * Message Handlers
3943 */
3944
3945 unsigned long long int tv_old;
3946 /*
3947 * message handler called when TOKEN message type received
3948 */
message_handler_orf_token(struct totemsrp_instance * instance,const void * msg,size_t msg_len,int endian_conversion_needed)3949 static int message_handler_orf_token (
3950 struct totemsrp_instance *instance,
3951 const void *msg,
3952 size_t msg_len,
3953 int endian_conversion_needed)
3954 {
3955 char token_storage[1500];
3956 char token_convert[1500];
3957 struct orf_token *token = NULL;
3958 int forward_token;
3959 unsigned int transmits_allowed;
3960 unsigned int mcasted_retransmit;
3961 unsigned int mcasted_regular;
3962 unsigned int last_aru;
3963
3964 #ifdef GIVEINFO
3965 unsigned long long tv_current;
3966 unsigned long long tv_diff;
3967
3968 tv_current = qb_util_nano_current_get ();
3969 tv_diff = tv_current - tv_old;
3970 tv_old = tv_current;
3971
3972 log_printf (instance->totemsrp_log_level_debug,
3973 "Time since last token %0.4f ms", ((float)tv_diff) / 1000000.0);
3974 #endif
3975
3976 if (check_orf_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
3977 return (0);
3978 }
3979
3980 if (instance->orf_token_discard) {
3981 return (0);
3982 }
3983 #ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
3984 if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
3985 return (0);
3986 }
3987 #endif
3988
3989 if (endian_conversion_needed) {
3990 orf_token_endian_convert ((struct orf_token *)msg,
3991 (struct orf_token *)token_convert);
3992 msg = (struct orf_token *)token_convert;
3993 }
3994
3995 /*
3996 * Make copy of token and retransmit list in case we have
3997 * to flush incoming messages from the kernel queue
3998 */
3999 token = (struct orf_token *)token_storage;
4000 memcpy (token, msg, sizeof (struct orf_token));
4001 memcpy (&token->rtr_list[0], (char *)msg + sizeof (struct orf_token),
4002 sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX);
4003
4004
4005 /*
4006 * Handle merge detection timeout
4007 */
4008 if (token->seq == instance->my_last_seq) {
4009 start_merge_detect_timeout (instance);
4010 instance->my_seq_unchanged += 1;
4011 } else {
4012 cancel_merge_detect_timeout (instance);
4013 cancel_token_hold_retransmit_timeout (instance);
4014 instance->my_seq_unchanged = 0;
4015 }
4016
4017 instance->my_last_seq = token->seq;
4018
4019 #ifdef TEST_RECOVERY_MSG_COUNT
4020 if (instance->memb_state == MEMB_STATE_OPERATIONAL && token->seq > TEST_RECOVERY_MSG_COUNT) {
4021 return (0);
4022 }
4023 #endif
4024 instance->flushing = 1;
4025 totemrrp_recv_flush (instance->totemrrp_context);
4026 instance->flushing = 0;
4027
4028 /*
4029 * Determine if we should hold (in reality drop) the token
4030 */
4031 instance->my_token_held = 0;
4032 if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0]) &&
4033 instance->my_seq_unchanged > instance->totem_config->seqno_unchanged_const) {
4034 instance->my_token_held = 1;
4035 } else
4036 if (!totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0]) &&
4037 instance->my_seq_unchanged >= instance->totem_config->seqno_unchanged_const) {
4038 instance->my_token_held = 1;
4039 }
4040
4041 /*
4042 * Hold onto token when there is no activity on ring and
4043 * this processor is the ring rep
4044 */
4045 forward_token = 1;
4046 if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0])) {
4047 if (instance->my_token_held) {
4048 forward_token = 0;
4049 }
4050 }
4051
4052 token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_RECEIVED);
4053
4054 switch (instance->memb_state) {
4055 case MEMB_STATE_COMMIT:
4056 /* Discard token */
4057 break;
4058
4059 case MEMB_STATE_OPERATIONAL:
4060 messages_free (instance, token->aru);
4061 /*
4062 * Do NOT add break, this case should also execute code in gather case.
4063 */
4064
4065 case MEMB_STATE_GATHER:
4066 /*
4067 * DO NOT add break, we use different free mechanism in recovery state
4068 */
4069
4070 case MEMB_STATE_RECOVERY:
4071 /*
4072 * Discard tokens from another configuration
4073 */
4074 if (memcmp (&token->ring_id, &instance->my_ring_id,
4075 sizeof (struct memb_ring_id)) != 0) {
4076
4077 if ((forward_token)
4078 && instance->use_heartbeat) {
4079 reset_heartbeat_timeout(instance);
4080 }
4081 else {
4082 cancel_heartbeat_timeout(instance);
4083 }
4084
4085 return (0); /* discard token */
4086 }
4087
4088 /*
4089 * Discard retransmitted tokens
4090 */
4091 if (sq_lte_compare (token->token_seq, instance->my_token_seq)) {
4092 return (0); /* discard token */
4093 }
4094 last_aru = instance->my_last_aru;
4095 instance->my_last_aru = token->aru;
4096
4097 transmits_allowed = fcc_calculate (instance, token);
4098 mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
4099
4100 if (instance->my_token_held == 1 &&
4101 (token->rtr_list_entries > 0 || mcasted_retransmit > 0)) {
4102 instance->my_token_held = 0;
4103 forward_token = 1;
4104 }
4105
4106 fcc_rtr_limit (instance, token, &transmits_allowed);
4107 mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
4108 /*
4109 if (mcasted_regular) {
4110 printf ("mcasted regular %d\n", mcasted_regular);
4111 printf ("token seq %d\n", token->seq);
4112 }
4113 */
4114 fcc_token_update (instance, token, mcasted_retransmit +
4115 mcasted_regular);
4116
4117 if (sq_lt_compare (instance->my_aru, token->aru) ||
4118 instance->my_id.addr[0].nodeid == token->aru_addr ||
4119 token->aru_addr == 0) {
4120
4121 token->aru = instance->my_aru;
4122 if (token->aru == token->seq) {
4123 token->aru_addr = 0;
4124 } else {
4125 token->aru_addr = instance->my_id.addr[0].nodeid;
4126 }
4127 }
4128 if (token->aru == last_aru && token->aru_addr != 0) {
4129 instance->my_aru_count += 1;
4130 } else {
4131 instance->my_aru_count = 0;
4132 }
4133
4134 /*
4135 * We really don't follow specification there. In specification, OTHER nodes
4136 * detect failure of one node (based on aru_count) and my_id IS NEVER added
4137 * to failed list (so node never mark itself as failed)
4138 */
4139 if (instance->my_aru_count > instance->totem_config->fail_to_recv_const &&
4140 token->aru_addr == instance->my_id.addr[0].nodeid) {
4141
4142 log_printf (instance->totemsrp_log_level_error,
4143 "FAILED TO RECEIVE");
4144
4145 instance->failed_to_recv = 1;
4146
4147 memb_set_merge (&instance->my_id, 1,
4148 instance->my_failed_list,
4149 &instance->my_failed_list_entries);
4150
4151 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FAILED_TO_RECEIVE);
4152 } else {
4153 instance->my_token_seq = token->token_seq;
4154 token->token_seq += 1;
4155
4156 if (instance->memb_state == MEMB_STATE_RECOVERY) {
4157 /*
4158 * instance->my_aru == instance->my_high_seq_received means this processor
4159 * has recovered all messages it can recover
4160 * (ie: its retrans queue is empty)
4161 */
4162 if (cs_queue_is_empty (&instance->retrans_message_queue) == 0) {
4163
4164 if (token->retrans_flg == 0) {
4165 token->retrans_flg = 1;
4166 instance->my_set_retrans_flg = 1;
4167 }
4168 } else
4169 if (token->retrans_flg == 1 && instance->my_set_retrans_flg) {
4170 token->retrans_flg = 0;
4171 instance->my_set_retrans_flg = 0;
4172 }
4173 log_printf (instance->totemsrp_log_level_debug,
4174 "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
4175 token->retrans_flg, instance->my_set_retrans_flg,
4176 cs_queue_is_empty (&instance->retrans_message_queue),
4177 instance->my_retrans_flg_count, token->aru);
4178 if (token->retrans_flg == 0) {
4179 instance->my_retrans_flg_count += 1;
4180 } else {
4181 instance->my_retrans_flg_count = 0;
4182 }
4183 if (instance->my_retrans_flg_count == 2) {
4184 instance->my_install_seq = token->seq;
4185 }
4186 log_printf (instance->totemsrp_log_level_debug,
4187 "install seq %x aru %x high seq received %x",
4188 instance->my_install_seq, instance->my_aru, instance->my_high_seq_received);
4189 if (instance->my_retrans_flg_count >= 2 &&
4190 instance->my_received_flg == 0 &&
4191 sq_lte_compare (instance->my_install_seq, instance->my_aru)) {
4192 instance->my_received_flg = 1;
4193 instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
4194 memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
4195 sizeof (struct totem_ip_address) * instance->my_trans_memb_entries);
4196 }
4197 if (instance->my_retrans_flg_count >= 3 &&
4198 sq_lte_compare (instance->my_install_seq, token->aru)) {
4199 instance->my_rotation_counter += 1;
4200 } else {
4201 instance->my_rotation_counter = 0;
4202 }
4203 if (instance->my_rotation_counter == 2) {
4204 log_printf (instance->totemsrp_log_level_debug,
4205 "retrans flag count %x token aru %x install seq %x aru %x %x",
4206 instance->my_retrans_flg_count, token->aru, instance->my_install_seq,
4207 instance->my_aru, token->seq);
4208
4209 memb_state_operational_enter (instance);
4210 instance->my_rotation_counter = 0;
4211 instance->my_retrans_flg_count = 0;
4212 }
4213 }
4214
4215 totemrrp_send_flush (instance->totemrrp_context);
4216 token_send (instance, token, forward_token);
4217
4218 #ifdef GIVEINFO
4219 tv_current = qb_util_nano_current_get ();
4220 tv_diff = tv_current - tv_old;
4221 tv_old = tv_current;
4222 log_printf (instance->totemsrp_log_level_debug,
4223 "I held %0.4f ms",
4224 ((float)tv_diff) / 1000000.0);
4225 #endif
4226 if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4227 messages_deliver_to_app (instance, 0,
4228 instance->my_high_seq_received);
4229 }
4230
4231 /*
4232 * Deliver messages after token has been transmitted
4233 * to improve performance
4234 */
4235 reset_token_timeout (instance); // REVIEWED
4236 reset_token_retransmit_timeout (instance); // REVIEWED
4237 if (totemip_equal(&instance->my_id.addr[0], &instance->my_ring_id.rep) &&
4238 instance->my_token_held == 1) {
4239
4240 start_token_hold_retransmit_timeout (instance);
4241 }
4242
4243 token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT);
4244 }
4245 break;
4246 }
4247
4248 if ((forward_token)
4249 && instance->use_heartbeat) {
4250 reset_heartbeat_timeout(instance);
4251 }
4252 else {
4253 cancel_heartbeat_timeout(instance);
4254 }
4255
4256 return (0);
4257 }
4258
messages_deliver_to_app(struct totemsrp_instance * instance,int skip,unsigned int end_point)4259 static void messages_deliver_to_app (
4260 struct totemsrp_instance *instance,
4261 int skip,
4262 unsigned int end_point)
4263 {
4264 struct sort_queue_item *sort_queue_item_p;
4265 unsigned int i;
4266 int res;
4267 struct mcast *mcast_in;
4268 struct mcast mcast_header;
4269 unsigned int range = 0;
4270 int endian_conversion_required;
4271 unsigned int my_high_delivered_stored = 0;
4272
4273
4274 range = end_point - instance->my_high_delivered;
4275
4276 if (range) {
4277 log_printf (instance->totemsrp_log_level_trace,
4278 "Delivering %x to %x", instance->my_high_delivered,
4279 end_point);
4280 }
4281 assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
4282 my_high_delivered_stored = instance->my_high_delivered;
4283
4284 /*
4285 * Deliver messages in order from rtr queue to pending delivery queue
4286 */
4287 for (i = 1; i <= range; i++) {
4288
4289 void *ptr = 0;
4290
4291 /*
4292 * If out of range of sort queue, stop assembly
4293 */
4294 res = sq_in_range (&instance->regular_sort_queue,
4295 my_high_delivered_stored + i);
4296 if (res == 0) {
4297 break;
4298 }
4299
4300 res = sq_item_get (&instance->regular_sort_queue,
4301 my_high_delivered_stored + i, &ptr);
4302 /*
4303 * If hole, stop assembly
4304 */
4305 if (res != 0 && skip == 0) {
4306 break;
4307 }
4308
4309 instance->my_high_delivered = my_high_delivered_stored + i;
4310
4311 if (res != 0) {
4312 continue;
4313
4314 }
4315
4316 sort_queue_item_p = ptr;
4317
4318 mcast_in = sort_queue_item_p->mcast;
4319 assert (mcast_in != (struct mcast *)0xdeadbeef);
4320
4321 endian_conversion_required = 0;
4322 if (mcast_in->header.endian_detector != ENDIAN_LOCAL) {
4323 endian_conversion_required = 1;
4324 mcast_endian_convert (mcast_in, &mcast_header);
4325 } else {
4326 memcpy (&mcast_header, mcast_in, sizeof (struct mcast));
4327 }
4328
4329 /*
4330 * Skip messages not originated in instance->my_deliver_memb
4331 */
4332 if (skip &&
4333 memb_set_subset (&mcast_header.system_from,
4334 1,
4335 instance->my_deliver_memb_list,
4336 instance->my_deliver_memb_entries) == 0) {
4337
4338 instance->my_high_delivered = my_high_delivered_stored + i;
4339
4340 continue;
4341 }
4342
4343 /*
4344 * Message found
4345 */
4346 log_printf (instance->totemsrp_log_level_trace,
4347 "Delivering MCAST message with seq %x to pending delivery queue",
4348 mcast_header.seq);
4349
4350 /*
4351 * Message is locally originated multicast
4352 */
4353 instance->totemsrp_deliver_fn (
4354 mcast_header.header.nodeid,
4355 ((char *)sort_queue_item_p->mcast) + sizeof (struct mcast),
4356 sort_queue_item_p->msg_len - sizeof (struct mcast),
4357 endian_conversion_required);
4358 }
4359 }
4360
4361 /*
4362 * recv message handler called when MCAST message type received
4363 */
message_handler_mcast(struct totemsrp_instance * instance,const void * msg,size_t msg_len,int endian_conversion_needed)4364 static int message_handler_mcast (
4365 struct totemsrp_instance *instance,
4366 const void *msg,
4367 size_t msg_len,
4368 int endian_conversion_needed)
4369 {
4370 struct sort_queue_item sort_queue_item;
4371 struct sq *sort_queue;
4372 struct mcast mcast_header;
4373
4374 if (check_mcast_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4375 return (0);
4376 }
4377
4378 if (endian_conversion_needed) {
4379 mcast_endian_convert (msg, &mcast_header);
4380 } else {
4381 memcpy (&mcast_header, msg, sizeof (struct mcast));
4382 }
4383
4384 if (mcast_header.header.encapsulated == MESSAGE_ENCAPSULATED) {
4385 sort_queue = &instance->recovery_sort_queue;
4386 } else {
4387 sort_queue = &instance->regular_sort_queue;
4388 }
4389
4390 assert (msg_len <= FRAME_SIZE_MAX);
4391
4392 #ifdef TEST_DROP_MCAST_PERCENTAGE
4393 if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
4394 return (0);
4395 }
4396 #endif
4397
4398 /*
4399 * If the message is foreign execute the switch below
4400 */
4401 if (memcmp (&instance->my_ring_id, &mcast_header.ring_id,
4402 sizeof (struct memb_ring_id)) != 0) {
4403
4404 switch (instance->memb_state) {
4405 case MEMB_STATE_OPERATIONAL:
4406 memb_set_merge (
4407 &mcast_header.system_from, 1,
4408 instance->my_proc_list, &instance->my_proc_list_entries);
4409 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE);
4410 break;
4411
4412 case MEMB_STATE_GATHER:
4413 if (!memb_set_subset (
4414 &mcast_header.system_from,
4415 1,
4416 instance->my_proc_list,
4417 instance->my_proc_list_entries)) {
4418
4419 memb_set_merge (&mcast_header.system_from, 1,
4420 instance->my_proc_list, &instance->my_proc_list_entries);
4421 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE);
4422 return (0);
4423 }
4424 break;
4425
4426 case MEMB_STATE_COMMIT:
4427 /* discard message */
4428 instance->stats.rx_msg_dropped++;
4429 break;
4430
4431 case MEMB_STATE_RECOVERY:
4432 /* discard message */
4433 instance->stats.rx_msg_dropped++;
4434 break;
4435 }
4436 return (0);
4437 }
4438
4439 log_printf (instance->totemsrp_log_level_trace,
4440 "Received ringid(%s:%lld) seq %x",
4441 totemip_print (&mcast_header.ring_id.rep),
4442 mcast_header.ring_id.seq,
4443 mcast_header.seq);
4444
4445 /*
4446 * Add mcast message to rtr queue if not already in rtr queue
4447 * otherwise free io vectors
4448 */
4449 if (msg_len > 0 && msg_len <= FRAME_SIZE_MAX &&
4450 sq_in_range (sort_queue, mcast_header.seq) &&
4451 sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
4452
4453 /*
4454 * Allocate new multicast memory block
4455 */
4456 // TODO LEAK
4457 sort_queue_item.mcast = totemsrp_buffer_alloc (instance);
4458 if (sort_queue_item.mcast == NULL) {
4459 return (-1); /* error here is corrected by the algorithm */
4460 }
4461 memcpy (sort_queue_item.mcast, msg, msg_len);
4462 sort_queue_item.msg_len = msg_len;
4463
4464 if (sq_lt_compare (instance->my_high_seq_received,
4465 mcast_header.seq)) {
4466 instance->my_high_seq_received = mcast_header.seq;
4467 }
4468
4469 sq_item_add (sort_queue, &sort_queue_item, mcast_header.seq);
4470 }
4471
4472 update_aru (instance);
4473 if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4474 messages_deliver_to_app (instance, 0, instance->my_high_seq_received);
4475 }
4476
4477 /* TODO remove from retrans message queue for old ring in recovery state */
4478 return (0);
4479 }
4480
message_handler_memb_merge_detect(struct totemsrp_instance * instance,const void * msg,size_t msg_len,int endian_conversion_needed)4481 static int message_handler_memb_merge_detect (
4482 struct totemsrp_instance *instance,
4483 const void *msg,
4484 size_t msg_len,
4485 int endian_conversion_needed)
4486 {
4487 struct memb_merge_detect memb_merge_detect;
4488
4489 if (check_memb_merge_detect_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4490 return (0);
4491 }
4492
4493 if (endian_conversion_needed) {
4494 memb_merge_detect_endian_convert (msg, &memb_merge_detect);
4495 } else {
4496 memcpy (&memb_merge_detect, msg,
4497 sizeof (struct memb_merge_detect));
4498 }
4499
4500 /*
4501 * do nothing if this is a merge detect from this configuration
4502 */
4503 if (memcmp (&instance->my_ring_id, &memb_merge_detect.ring_id,
4504 sizeof (struct memb_ring_id)) == 0) {
4505
4506 return (0);
4507 }
4508
4509 /*
4510 * Execute merge operation
4511 */
4512 switch (instance->memb_state) {
4513 case MEMB_STATE_OPERATIONAL:
4514 memb_set_merge (&memb_merge_detect.system_from, 1,
4515 instance->my_proc_list, &instance->my_proc_list_entries);
4516 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE);
4517 break;
4518
4519 case MEMB_STATE_GATHER:
4520 if (!memb_set_subset (
4521 &memb_merge_detect.system_from,
4522 1,
4523 instance->my_proc_list,
4524 instance->my_proc_list_entries)) {
4525
4526 memb_set_merge (&memb_merge_detect.system_from, 1,
4527 instance->my_proc_list, &instance->my_proc_list_entries);
4528 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE);
4529 return (0);
4530 }
4531 break;
4532
4533 case MEMB_STATE_COMMIT:
4534 /* do nothing in commit */
4535 break;
4536
4537 case MEMB_STATE_RECOVERY:
4538 /* do nothing in recovery */
4539 break;
4540 }
4541 return (0);
4542 }
4543
memb_join_process(struct totemsrp_instance * instance,const struct memb_join * memb_join)4544 static void memb_join_process (
4545 struct totemsrp_instance *instance,
4546 const struct memb_join *memb_join)
4547 {
4548 struct srp_addr *proc_list;
4549 struct srp_addr *failed_list;
4550 int gather_entered = 0;
4551 int fail_minus_memb_entries = 0;
4552 struct srp_addr fail_minus_memb[PROCESSOR_COUNT_MAX];
4553
4554 proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4555 failed_list = proc_list + memb_join->proc_list_entries;
4556
4557 /*
4558 memb_set_print ("proclist", proc_list, memb_join->proc_list_entries);
4559 memb_set_print ("faillist", failed_list, memb_join->failed_list_entries);
4560 memb_set_print ("my_proclist", instance->my_proc_list, instance->my_proc_list_entries);
4561 memb_set_print ("my_faillist", instance->my_failed_list, instance->my_failed_list_entries);
4562 -*/
4563
4564 if (memb_join->header.type == MESSAGE_TYPE_MEMB_JOIN) {
4565 if (instance->flushing) {
4566 if (memb_join->header.nodeid == LEAVE_DUMMY_NODEID) {
4567 log_printf (instance->totemsrp_log_level_warning,
4568 "Discarding LEAVE message during flush, nodeid=%u",
4569 memb_join->failed_list_entries > 0 ? failed_list[memb_join->failed_list_entries - 1 ].addr[0].nodeid : LEAVE_DUMMY_NODEID);
4570 if (memb_join->failed_list_entries > 0) {
4571 my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].addr[0].nodeid);
4572 }
4573 } else {
4574 log_printf (instance->totemsrp_log_level_warning,
4575 "Discarding JOIN message during flush, nodeid=%d", memb_join->header.nodeid);
4576 }
4577 return;
4578 } else {
4579 if (memb_join->header.nodeid == LEAVE_DUMMY_NODEID) {
4580 log_printf (instance->totemsrp_log_level_debug,
4581 "Received LEAVE message from %u", memb_join->failed_list_entries > 0 ? failed_list[memb_join->failed_list_entries - 1 ].addr[0].nodeid : LEAVE_DUMMY_NODEID);
4582 if (memb_join->failed_list_entries > 0) {
4583 my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].addr[0].nodeid);
4584 }
4585 }
4586 }
4587
4588 }
4589
4590 if (memb_set_equal (proc_list,
4591 memb_join->proc_list_entries,
4592 instance->my_proc_list,
4593 instance->my_proc_list_entries) &&
4594
4595 memb_set_equal (failed_list,
4596 memb_join->failed_list_entries,
4597 instance->my_failed_list,
4598 instance->my_failed_list_entries)) {
4599
4600 memb_consensus_set (instance, &memb_join->system_from);
4601
4602 if (memb_consensus_agreed (instance) && instance->failed_to_recv == 1) {
4603 instance->failed_to_recv = 0;
4604 srp_addr_copy (&instance->my_proc_list[0],
4605 &instance->my_id);
4606 instance->my_proc_list_entries = 1;
4607 instance->my_failed_list_entries = 0;
4608
4609 memb_state_commit_token_create (instance);
4610
4611 memb_state_commit_enter (instance);
4612 return;
4613 }
4614 if (memb_consensus_agreed (instance) &&
4615 memb_lowest_in_config (instance)) {
4616
4617 memb_state_commit_token_create (instance);
4618
4619 memb_state_commit_enter (instance);
4620 } else {
4621 goto out;
4622 }
4623 } else
4624 if (memb_set_subset (proc_list,
4625 memb_join->proc_list_entries,
4626 instance->my_proc_list,
4627 instance->my_proc_list_entries) &&
4628
4629 memb_set_subset (failed_list,
4630 memb_join->failed_list_entries,
4631 instance->my_failed_list,
4632 instance->my_failed_list_entries)) {
4633
4634 goto out;
4635 } else
4636 if (memb_set_subset (&memb_join->system_from, 1,
4637 instance->my_failed_list, instance->my_failed_list_entries)) {
4638
4639 goto out;
4640 } else {
4641 memb_set_merge (proc_list,
4642 memb_join->proc_list_entries,
4643 instance->my_proc_list, &instance->my_proc_list_entries);
4644
4645 if (memb_set_subset (
4646 &instance->my_id, 1,
4647 failed_list, memb_join->failed_list_entries)) {
4648
4649 memb_set_merge (
4650 &memb_join->system_from, 1,
4651 instance->my_failed_list, &instance->my_failed_list_entries);
4652 } else {
4653 if (memb_set_subset (
4654 &memb_join->system_from, 1,
4655 instance->my_memb_list,
4656 instance->my_memb_entries)) {
4657
4658 if (memb_set_subset (
4659 &memb_join->system_from, 1,
4660 instance->my_failed_list,
4661 instance->my_failed_list_entries) == 0) {
4662
4663 memb_set_merge (failed_list,
4664 memb_join->failed_list_entries,
4665 instance->my_failed_list, &instance->my_failed_list_entries);
4666 } else {
4667 memb_set_subtract (fail_minus_memb,
4668 &fail_minus_memb_entries,
4669 failed_list,
4670 memb_join->failed_list_entries,
4671 instance->my_memb_list,
4672 instance->my_memb_entries);
4673
4674 memb_set_merge (fail_minus_memb,
4675 fail_minus_memb_entries,
4676 instance->my_failed_list,
4677 &instance->my_failed_list_entries);
4678 }
4679 }
4680 }
4681 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_JOIN);
4682 gather_entered = 1;
4683 }
4684
4685 out:
4686 if (gather_entered == 0 &&
4687 instance->memb_state == MEMB_STATE_OPERATIONAL) {
4688
4689 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE);
4690 }
4691 }
4692
memb_join_endian_convert(const struct memb_join * in,struct memb_join * out)4693 static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out)
4694 {
4695 int i;
4696 struct srp_addr *in_proc_list;
4697 struct srp_addr *in_failed_list;
4698 struct srp_addr *out_proc_list;
4699 struct srp_addr *out_failed_list;
4700
4701 out->header.type = in->header.type;
4702 out->header.endian_detector = ENDIAN_LOCAL;
4703 out->header.nodeid = swab32 (in->header.nodeid);
4704 srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4705 out->proc_list_entries = swab32 (in->proc_list_entries);
4706 out->failed_list_entries = swab32 (in->failed_list_entries);
4707 out->ring_seq = swab64 (in->ring_seq);
4708
4709 in_proc_list = (struct srp_addr *)in->end_of_memb_join;
4710 in_failed_list = in_proc_list + out->proc_list_entries;
4711 out_proc_list = (struct srp_addr *)out->end_of_memb_join;
4712 out_failed_list = out_proc_list + out->proc_list_entries;
4713
4714 for (i = 0; i < out->proc_list_entries; i++) {
4715 srp_addr_copy_endian_convert (&out_proc_list[i], &in_proc_list[i]);
4716 }
4717 for (i = 0; i < out->failed_list_entries; i++) {
4718 srp_addr_copy_endian_convert (&out_failed_list[i], &in_failed_list[i]);
4719 }
4720 }
4721
memb_commit_token_endian_convert(const struct memb_commit_token * in,struct memb_commit_token * out)4722 static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out)
4723 {
4724 int i;
4725 struct srp_addr *in_addr = (struct srp_addr *)in->end_of_commit_token;
4726 struct srp_addr *out_addr = (struct srp_addr *)out->end_of_commit_token;
4727 struct memb_commit_token_memb_entry *in_memb_list;
4728 struct memb_commit_token_memb_entry *out_memb_list;
4729
4730 out->header.type = in->header.type;
4731 out->header.endian_detector = ENDIAN_LOCAL;
4732 out->header.nodeid = swab32 (in->header.nodeid);
4733 out->token_seq = swab32 (in->token_seq);
4734 totemip_copy_endian_convert(&out->ring_id.rep, &in->ring_id.rep);
4735 out->ring_id.seq = swab64 (in->ring_id.seq);
4736 out->retrans_flg = swab32 (in->retrans_flg);
4737 out->memb_index = swab32 (in->memb_index);
4738 out->addr_entries = swab32 (in->addr_entries);
4739
4740 in_memb_list = (struct memb_commit_token_memb_entry *)(in_addr + out->addr_entries);
4741 out_memb_list = (struct memb_commit_token_memb_entry *)(out_addr + out->addr_entries);
4742 for (i = 0; i < out->addr_entries; i++) {
4743 srp_addr_copy_endian_convert (&out_addr[i], &in_addr[i]);
4744
4745 /*
4746 * Only convert the memb entry if it has been set
4747 */
4748 if (in_memb_list[i].ring_id.rep.family != 0) {
4749 totemip_copy_endian_convert (&out_memb_list[i].ring_id.rep,
4750 &in_memb_list[i].ring_id.rep);
4751
4752 out_memb_list[i].ring_id.seq =
4753 swab64 (in_memb_list[i].ring_id.seq);
4754 out_memb_list[i].aru = swab32 (in_memb_list[i].aru);
4755 out_memb_list[i].high_delivered = swab32 (in_memb_list[i].high_delivered);
4756 out_memb_list[i].received_flg = swab32 (in_memb_list[i].received_flg);
4757 }
4758 }
4759 }
4760
orf_token_endian_convert(const struct orf_token * in,struct orf_token * out)4761 static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out)
4762 {
4763 int i;
4764
4765 out->header.type = in->header.type;
4766 out->header.endian_detector = ENDIAN_LOCAL;
4767 out->header.nodeid = swab32 (in->header.nodeid);
4768 out->seq = swab32 (in->seq);
4769 out->token_seq = swab32 (in->token_seq);
4770 out->aru = swab32 (in->aru);
4771 totemip_copy_endian_convert(&out->ring_id.rep, &in->ring_id.rep);
4772 out->aru_addr = swab32(in->aru_addr);
4773 out->ring_id.seq = swab64 (in->ring_id.seq);
4774 out->fcc = swab32 (in->fcc);
4775 out->backlog = swab32 (in->backlog);
4776 out->retrans_flg = swab32 (in->retrans_flg);
4777 out->rtr_list_entries = swab32 (in->rtr_list_entries);
4778 for (i = 0; i < out->rtr_list_entries; i++) {
4779 totemip_copy_endian_convert(&out->rtr_list[i].ring_id.rep, &in->rtr_list[i].ring_id.rep);
4780 out->rtr_list[i].ring_id.seq = swab64 (in->rtr_list[i].ring_id.seq);
4781 out->rtr_list[i].seq = swab32 (in->rtr_list[i].seq);
4782 }
4783 }
4784
mcast_endian_convert(const struct mcast * in,struct mcast * out)4785 static void mcast_endian_convert (const struct mcast *in, struct mcast *out)
4786 {
4787 out->header.type = in->header.type;
4788 out->header.endian_detector = ENDIAN_LOCAL;
4789 out->header.nodeid = swab32 (in->header.nodeid);
4790 out->header.encapsulated = in->header.encapsulated;
4791
4792 out->seq = swab32 (in->seq);
4793 out->this_seqno = swab32 (in->this_seqno);
4794 totemip_copy_endian_convert(&out->ring_id.rep, &in->ring_id.rep);
4795 out->ring_id.seq = swab64 (in->ring_id.seq);
4796 out->node_id = swab32 (in->node_id);
4797 out->guarantee = swab32 (in->guarantee);
4798 srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4799 }
4800
memb_merge_detect_endian_convert(const struct memb_merge_detect * in,struct memb_merge_detect * out)4801 static void memb_merge_detect_endian_convert (
4802 const struct memb_merge_detect *in,
4803 struct memb_merge_detect *out)
4804 {
4805 out->header.type = in->header.type;
4806 out->header.endian_detector = ENDIAN_LOCAL;
4807 out->header.nodeid = swab32 (in->header.nodeid);
4808 totemip_copy_endian_convert(&out->ring_id.rep, &in->ring_id.rep);
4809 out->ring_id.seq = swab64 (in->ring_id.seq);
4810 srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4811 }
4812
ignore_join_under_operational(struct totemsrp_instance * instance,const struct memb_join * memb_join)4813 static int ignore_join_under_operational (
4814 struct totemsrp_instance *instance,
4815 const struct memb_join *memb_join)
4816 {
4817 struct srp_addr *proc_list;
4818 struct srp_addr *failed_list;
4819 unsigned long long ring_seq;
4820
4821 proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4822 failed_list = proc_list + memb_join->proc_list_entries;
4823 ring_seq = memb_join->ring_seq;
4824
4825 if (memb_set_subset (&instance->my_id, 1,
4826 failed_list, memb_join->failed_list_entries)) {
4827 return (1);
4828 }
4829
4830 /*
4831 * In operational state, my_proc_list is exactly the same as
4832 * my_memb_list.
4833 */
4834 if ((memb_set_subset (&memb_join->system_from, 1,
4835 instance->my_memb_list, instance->my_memb_entries)) &&
4836 (ring_seq < instance->my_ring_id.seq)) {
4837 return (1);
4838 }
4839
4840 return (0);
4841 }
4842
message_handler_memb_join(struct totemsrp_instance * instance,const void * msg,size_t msg_len,int endian_conversion_needed)4843 static int message_handler_memb_join (
4844 struct totemsrp_instance *instance,
4845 const void *msg,
4846 size_t msg_len,
4847 int endian_conversion_needed)
4848 {
4849 const struct memb_join *memb_join;
4850 struct memb_join *memb_join_convert = alloca (msg_len);
4851
4852 if (check_memb_join_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4853 return (0);
4854 }
4855
4856 if (endian_conversion_needed) {
4857 memb_join = memb_join_convert;
4858 memb_join_endian_convert (msg, memb_join_convert);
4859
4860 } else {
4861 memb_join = msg;
4862 }
4863 /*
4864 * If the process paused because it wasn't scheduled in a timely
4865 * fashion, flush the join messages because they may be queued
4866 * entries
4867 */
4868 if (pause_flush (instance)) {
4869 return (0);
4870 }
4871
4872 if (instance->token_ring_id_seq < memb_join->ring_seq) {
4873 instance->token_ring_id_seq = memb_join->ring_seq;
4874 }
4875 switch (instance->memb_state) {
4876 case MEMB_STATE_OPERATIONAL:
4877 if (!ignore_join_under_operational (instance, memb_join)) {
4878 memb_join_process (instance, memb_join);
4879 }
4880 break;
4881
4882 case MEMB_STATE_GATHER:
4883 memb_join_process (instance, memb_join);
4884 break;
4885
4886 case MEMB_STATE_COMMIT:
4887 if (memb_set_subset (&memb_join->system_from,
4888 1,
4889 instance->my_new_memb_list,
4890 instance->my_new_memb_entries) &&
4891
4892 memb_join->ring_seq >= instance->my_ring_id.seq) {
4893
4894 memb_join_process (instance, memb_join);
4895 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE);
4896 }
4897 break;
4898
4899 case MEMB_STATE_RECOVERY:
4900 if (memb_set_subset (&memb_join->system_from,
4901 1,
4902 instance->my_new_memb_list,
4903 instance->my_new_memb_entries) &&
4904
4905 memb_join->ring_seq >= instance->my_ring_id.seq) {
4906
4907 memb_join_process (instance, memb_join);
4908 memb_recovery_state_token_loss (instance);
4909 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY);
4910 }
4911 break;
4912 }
4913 return (0);
4914 }
4915
message_handler_memb_commit_token(struct totemsrp_instance * instance,const void * msg,size_t msg_len,int endian_conversion_needed)4916 static int message_handler_memb_commit_token (
4917 struct totemsrp_instance *instance,
4918 const void *msg,
4919 size_t msg_len,
4920 int endian_conversion_needed)
4921 {
4922 struct memb_commit_token *memb_commit_token_convert = alloca (msg_len);
4923 struct memb_commit_token *memb_commit_token;
4924 struct srp_addr sub[PROCESSOR_COUNT_MAX];
4925 int sub_entries;
4926
4927 struct srp_addr *addr;
4928
4929 log_printf (instance->totemsrp_log_level_debug,
4930 "got commit token");
4931
4932 if (check_memb_commit_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4933 return (0);
4934 }
4935
4936 if (endian_conversion_needed) {
4937 memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4938 } else {
4939 memcpy (memb_commit_token_convert, msg, msg_len);
4940 }
4941 memb_commit_token = memb_commit_token_convert;
4942 addr = (struct srp_addr *)memb_commit_token->end_of_commit_token;
4943
4944 #ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4945 if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
4946 return (0);
4947 }
4948 #endif
4949 switch (instance->memb_state) {
4950 case MEMB_STATE_OPERATIONAL:
4951 /* discard token */
4952 break;
4953
4954 case MEMB_STATE_GATHER:
4955 memb_set_subtract (sub, &sub_entries,
4956 instance->my_proc_list, instance->my_proc_list_entries,
4957 instance->my_failed_list, instance->my_failed_list_entries);
4958
4959 if (memb_set_equal (addr,
4960 memb_commit_token->addr_entries,
4961 sub,
4962 sub_entries) &&
4963
4964 memb_commit_token->ring_id.seq > instance->my_ring_id.seq) {
4965 memcpy (instance->commit_token, memb_commit_token, msg_len);
4966 memb_state_commit_enter (instance);
4967 }
4968 break;
4969
4970 case MEMB_STATE_COMMIT:
4971 /*
4972 * If retransmitted commit tokens are sent on this ring
4973 * filter them out and only enter recovery once the
4974 * commit token has traversed the array. This is
4975 * determined by :
4976 * memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4977 */
4978 if (memb_commit_token->ring_id.seq == instance->my_ring_id.seq &&
4979 memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4980 memb_state_recovery_enter (instance, memb_commit_token);
4981 }
4982 break;
4983
4984 case MEMB_STATE_RECOVERY:
4985 if (totemip_equal (&instance->my_id.addr[0], &instance->my_ring_id.rep)) {
4986
4987 /* Filter out duplicated tokens */
4988 if (instance->originated_orf_token) {
4989 break;
4990 }
4991
4992 instance->originated_orf_token = 1;
4993
4994 log_printf (instance->totemsrp_log_level_debug,
4995 "Sending initial ORF token");
4996
4997 // TODO convert instead of initiate
4998 orf_token_send_initial (instance);
4999 reset_token_timeout (instance); // REVIEWED
5000 reset_token_retransmit_timeout (instance); // REVIEWED
5001 }
5002 break;
5003 }
5004 return (0);
5005 }
5006
message_handler_token_hold_cancel(struct totemsrp_instance * instance,const void * msg,size_t msg_len,int endian_conversion_needed)5007 static int message_handler_token_hold_cancel (
5008 struct totemsrp_instance *instance,
5009 const void *msg,
5010 size_t msg_len,
5011 int endian_conversion_needed)
5012 {
5013 const struct token_hold_cancel *token_hold_cancel = msg;
5014
5015 if (check_token_hold_cancel_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
5016 return (0);
5017 }
5018
5019 if (memcmp (&token_hold_cancel->ring_id, &instance->my_ring_id,
5020 sizeof (struct memb_ring_id)) == 0) {
5021
5022 instance->my_seq_unchanged = 0;
5023 if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0])) {
5024 timer_function_token_retransmit_timeout (instance);
5025 }
5026 }
5027 return (0);
5028 }
5029
main_deliver_fn(void * context,const void * msg,unsigned int msg_len)5030 void main_deliver_fn (
5031 void *context,
5032 const void *msg,
5033 unsigned int msg_len)
5034 {
5035 struct totemsrp_instance *instance = context;
5036 const struct message_header *message_header = msg;
5037
5038 if (msg_len < sizeof (struct message_header)) {
5039 log_printf (instance->totemsrp_log_level_security,
5040 "Received message is too short... ignoring %u.",
5041 (unsigned int)msg_len);
5042 return;
5043 }
5044
5045
5046 switch (message_header->type) {
5047 case MESSAGE_TYPE_ORF_TOKEN:
5048 instance->stats.orf_token_rx++;
5049 break;
5050 case MESSAGE_TYPE_MCAST:
5051 instance->stats.mcast_rx++;
5052 break;
5053 case MESSAGE_TYPE_MEMB_MERGE_DETECT:
5054 instance->stats.memb_merge_detect_rx++;
5055 break;
5056 case MESSAGE_TYPE_MEMB_JOIN:
5057 instance->stats.memb_join_rx++;
5058 break;
5059 case MESSAGE_TYPE_MEMB_COMMIT_TOKEN:
5060 instance->stats.memb_commit_token_rx++;
5061 break;
5062 case MESSAGE_TYPE_TOKEN_HOLD_CANCEL:
5063 instance->stats.token_hold_cancel_rx++;
5064 break;
5065 default:
5066 log_printf (instance->totemsrp_log_level_security, "Type of received message is wrong... ignoring %d.\n", (int)message_header->type);
5067 printf ("wrong message type\n");
5068 instance->stats.rx_msg_dropped++;
5069 return;
5070 }
5071 /*
5072 * Handle incoming message
5073 */
5074 totemsrp_message_handlers.handler_functions[(int)message_header->type] (
5075 instance,
5076 msg,
5077 msg_len,
5078 message_header->endian_detector != ENDIAN_LOCAL);
5079 }
5080
main_iface_change_fn(void * context,const struct totem_ip_address * iface_addr,unsigned int iface_no)5081 void main_iface_change_fn (
5082 void *context,
5083 const struct totem_ip_address *iface_addr,
5084 unsigned int iface_no)
5085 {
5086 struct totemsrp_instance *instance = context;
5087 int i;
5088
5089 totemip_copy (&instance->my_id.addr[iface_no], iface_addr);
5090 assert (instance->my_id.addr[iface_no].nodeid);
5091
5092 totemip_copy (&instance->my_memb_list[0].addr[iface_no], iface_addr);
5093
5094 if (instance->iface_changes++ == 0) {
5095 instance->memb_ring_id_create_or_load (&instance->my_ring_id,
5096 &instance->my_id.addr[0]);
5097 instance->token_ring_id_seq = instance->my_ring_id.seq;
5098 log_printf (
5099 instance->totemsrp_log_level_debug,
5100 "Created or loaded sequence id %llx.%s for this ring.",
5101 instance->my_ring_id.seq,
5102 totemip_print (&instance->my_ring_id.rep));
5103
5104 if (instance->totemsrp_service_ready_fn) {
5105 instance->totemsrp_service_ready_fn ();
5106 }
5107
5108 }
5109
5110 for (i = 0; i < instance->totem_config->interfaces[iface_no].member_count; i++) {
5111 totemsrp_member_add (instance,
5112 &instance->totem_config->interfaces[iface_no].member_list[i],
5113 iface_no);
5114 }
5115
5116 if (instance->iface_changes >= instance->totem_config->interface_count) {
5117 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_INTERFACE_CHANGE);
5118 }
5119 }
5120
totemsrp_net_mtu_adjust(struct totem_config * totem_config)5121 void totemsrp_net_mtu_adjust (struct totem_config *totem_config) {
5122 totem_config->net_mtu -= sizeof (struct mcast);
5123 }
5124
totemsrp_service_ready_register(void * context,void (* totem_service_ready)(void))5125 void totemsrp_service_ready_register (
5126 void *context,
5127 void (*totem_service_ready) (void))
5128 {
5129 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5130
5131 instance->totemsrp_service_ready_fn = totem_service_ready;
5132 }
5133
totemsrp_member_add(void * context,const struct totem_ip_address * member,int ring_no)5134 int totemsrp_member_add (
5135 void *context,
5136 const struct totem_ip_address *member,
5137 int ring_no)
5138 {
5139 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5140 int res;
5141
5142 res = totemrrp_member_add (instance->totemrrp_context, member, ring_no);
5143
5144 return (res);
5145 }
5146
totemsrp_member_remove(void * context,const struct totem_ip_address * member,int ring_no)5147 int totemsrp_member_remove (
5148 void *context,
5149 const struct totem_ip_address *member,
5150 int ring_no)
5151 {
5152 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5153 int res;
5154
5155 res = totemrrp_member_remove (instance->totemrrp_context, member, ring_no);
5156
5157 return (res);
5158 }
5159
totemsrp_threaded_mode_enable(void * context)5160 void totemsrp_threaded_mode_enable (void *context)
5161 {
5162 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5163
5164 instance->threaded_mode_enabled = 1;
5165 }
5166
totemsrp_trans_ack(void * context)5167 void totemsrp_trans_ack (void *context)
5168 {
5169 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5170
5171 instance->waiting_trans_ack = 0;
5172 instance->totemsrp_waiting_trans_ack_cb_fn (0);
5173 }
5174