1 /*
2  * Copyright (c) 2003-2006 MontaVista Software, Inc.
3  * Copyright (c) 2006-2009 Red Hat, Inc.
4  *
5  * All rights reserved.
6  *
7  * Author: Steven Dake (sdake@redhat.com)
8  *
9  * This software licensed under BSD license, the text of which follows:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions are met:
13  *
14  * - Redistributions of source code must retain the above copyright notice,
15  *   this list of conditions and the following disclaimer.
16  * - Redistributions in binary form must reproduce the above copyright notice,
17  *   this list of conditions and the following disclaimer in the documentation
18  *   and/or other materials provided with the distribution.
19  * - Neither the name of the MontaVista Software, Inc. nor the names of its
20  *   contributors may be used to endorse or promote products derived from this
21  *   software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33  * THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 /*
37  * The first version of this code was based upon Yair Amir's PhD thesis:
38  *	http://www.cs.jhu.edu/~yairamir/phd.ps) (ch4,5).
39  *
40  * The current version of totemsrp implements the Totem protocol specified in:
41  * 	http://citeseer.ist.psu.edu/amir95totem.html
42  *
43  * The deviations from the above published protocols are:
44  * - encryption of message contents with nss
45  * - authentication of meessage contents with SHA1/HMAC
46  * - token hold mode where token doesn't rotate on unused ring - reduces cpu
47  *   usage on 1.6ghz xeon from 35% to less then .1 % as measured by top
48  */
49 
50 #include <config.h>
51 
52 #include <assert.h>
53 #ifdef HAVE_ALLOCA_H
54 #include <alloca.h>
55 #endif
56 #include <sys/mman.h>
57 #include <sys/types.h>
58 #include <sys/stat.h>
59 #include <sys/socket.h>
60 #include <netdb.h>
61 #include <sys/un.h>
62 #include <sys/ioctl.h>
63 #include <sys/param.h>
64 #include <netinet/in.h>
65 #include <arpa/inet.h>
66 #include <unistd.h>
67 #include <fcntl.h>
68 #include <stdlib.h>
69 #include <stdio.h>
70 #include <errno.h>
71 #include <sched.h>
72 #include <time.h>
73 #include <sys/time.h>
74 #include <sys/poll.h>
75 #include <sys/uio.h>
76 #include <limits.h>
77 
78 #include <qb/qbdefs.h>
79 #include <qb/qbutil.h>
80 #include <qb/qbloop.h>
81 
82 #include <corosync/swab.h>
83 #include <corosync/sq.h>
84 #include <corosync/list.h>
85 
86 #define LOGSYS_UTILS_ONLY 1
87 #include <corosync/logsys.h>
88 
89 #include "totemsrp.h"
90 #include "totemrrp.h"
91 #include "totemnet.h"
92 
93 #include "cs_queue.h"
94 
95 #define LOCALHOST_IP				inet_addr("127.0.0.1")
96 #define QUEUE_RTR_ITEMS_SIZE_MAX		16384 /* allow 16384 retransmit items */
97 #define RETRANS_MESSAGE_QUEUE_SIZE_MAX		16384 /* allow 500 messages to be queued */
98 #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX		500 /* allow 500 messages to be queued */
99 #define MAXIOVS					5
100 #define RETRANSMIT_ENTRIES_MAX			30
101 #define TOKEN_SIZE_MAX				64000 /* bytes */
102 #define LEAVE_DUMMY_NODEID                      0
103 
104 /*
105  * Rollover handling:
106  * SEQNO_START_MSG is the starting sequence number after a new configuration
107  *	This should remain zero, unless testing overflow in which case
108  *	0x7ffff000 and 0xfffff000 are good starting values.
109  *
110  * SEQNO_START_TOKEN is the starting sequence number after a new configuration
111  *	for a token.  This should remain zero, unless testing overflow in which
112  *	case 07fffff00 or 0xffffff00 are good starting values.
113  */
114 #define SEQNO_START_MSG 0x0
115 #define SEQNO_START_TOKEN 0x0
116 
117 /*
118  * These can be used ot test different rollover points
119  * #define SEQNO_START_MSG 0xfffffe00
120  * #define SEQNO_START_TOKEN 0xfffffe00
121  */
122 
123 /*
124  * These can be used to test the error recovery algorithms
125  * #define TEST_DROP_ORF_TOKEN_PERCENTAGE 30
126  * #define TEST_DROP_COMMIT_TOKEN_PERCENTAGE 30
127  * #define TEST_DROP_MCAST_PERCENTAGE 50
128  * #define TEST_RECOVERY_MSG_COUNT 300
129  */
130 
131 /*
132  * we compare incoming messages to determine if their endian is
133  * different - if so convert them
134  *
135  * do not change
136  */
137 #define ENDIAN_LOCAL					 0xff22
138 
139 enum message_type {
140 	MESSAGE_TYPE_ORF_TOKEN = 0,			/* Ordering, Reliability, Flow (ORF) control Token */
141 	MESSAGE_TYPE_MCAST = 1,				/* ring ordered multicast message */
142 	MESSAGE_TYPE_MEMB_MERGE_DETECT = 2,	/* merge rings if there are available rings */
143 	MESSAGE_TYPE_MEMB_JOIN = 3,			/* membership join message */
144 	MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4,	/* membership commit token */
145 	MESSAGE_TYPE_TOKEN_HOLD_CANCEL = 5,	/* cancel the holding of the token */
146 };
147 
148 enum encapsulation_type {
149 	MESSAGE_ENCAPSULATED = 1,
150 	MESSAGE_NOT_ENCAPSULATED = 2
151 };
152 
153 /*
154  * New membership algorithm local variables
155  */
156 struct consensus_list_item {
157 	struct srp_addr addr;
158 	int set;
159 };
160 
161 
162 struct token_callback_instance {
163 	struct list_head list;
164 	int (*callback_fn) (enum totem_callback_token_type type, const void *);
165 	enum totem_callback_token_type callback_type;
166 	int delete;
167 	void *data;
168 };
169 
170 
171 struct totemsrp_socket {
172 	int mcast;
173 	int token;
174 };
175 
176 struct message_header {
177 	char type;
178 	char encapsulated;
179 	unsigned short endian_detector;
180 	unsigned int nodeid;
181 } __attribute__((packed));
182 
183 
184 struct mcast {
185 	struct message_header header;
186 	struct srp_addr system_from;
187 	unsigned int seq;
188 	int this_seqno;
189 	struct memb_ring_id ring_id;
190 	unsigned int node_id;
191 	int guarantee;
192 } __attribute__((packed));
193 
194 
195 struct rtr_item  {
196 	struct memb_ring_id ring_id;
197 	unsigned int seq;
198 }__attribute__((packed));
199 
200 
201 struct orf_token {
202 	struct message_header header;
203 	unsigned int seq;
204 	unsigned int token_seq;
205 	unsigned int aru;
206 	unsigned int aru_addr;
207 	struct memb_ring_id ring_id;
208 	unsigned int backlog;
209 	unsigned int fcc;
210 	int retrans_flg;
211 	int rtr_list_entries;
212 	struct rtr_item rtr_list[0];
213 }__attribute__((packed));
214 
215 
216 struct memb_join {
217 	struct message_header header;
218 	struct srp_addr system_from;
219 	unsigned int proc_list_entries;
220 	unsigned int failed_list_entries;
221 	unsigned long long ring_seq;
222 	unsigned char end_of_memb_join[0];
223 /*
224  * These parts of the data structure are dynamic:
225  * struct srp_addr proc_list[];
226  * struct srp_addr failed_list[];
227  */
228 } __attribute__((packed));
229 
230 
231 struct memb_merge_detect {
232 	struct message_header header;
233 	struct srp_addr system_from;
234 	struct memb_ring_id ring_id;
235 } __attribute__((packed));
236 
237 
238 struct token_hold_cancel {
239 	struct message_header header;
240 	struct memb_ring_id ring_id;
241 } __attribute__((packed));
242 
243 
244 struct memb_commit_token_memb_entry {
245 	struct memb_ring_id ring_id;
246 	unsigned int aru;
247 	unsigned int high_delivered;
248 	unsigned int received_flg;
249 }__attribute__((packed));
250 
251 
252 struct memb_commit_token {
253 	struct message_header header;
254 	unsigned int token_seq;
255 	struct memb_ring_id ring_id;
256 	unsigned int retrans_flg;
257 	int memb_index;
258 	int addr_entries;
259 	unsigned char end_of_commit_token[0];
260 /*
261  * These parts of the data structure are dynamic:
262  *
263  *	struct srp_addr addr[PROCESSOR_COUNT_MAX];
264  *	struct memb_commit_token_memb_entry memb_list[PROCESSOR_COUNT_MAX];
265  */
266 }__attribute__((packed));
267 
268 struct message_item {
269 	struct mcast *mcast;
270 	unsigned int msg_len;
271 };
272 
273 struct sort_queue_item {
274 	struct mcast *mcast;
275 	unsigned int msg_len;
276 };
277 
278 enum memb_state {
279 	MEMB_STATE_OPERATIONAL = 1,
280 	MEMB_STATE_GATHER = 2,
281 	MEMB_STATE_COMMIT = 3,
282 	MEMB_STATE_RECOVERY = 4
283 };
284 
285 struct totemsrp_instance {
286 	int iface_changes;
287 
288 	int failed_to_recv;
289 
290 	/*
291 	 * Flow control mcasts and remcasts on last and current orf_token
292 	 */
293 	int fcc_remcast_last;
294 
295 	int fcc_mcast_last;
296 
297 	int fcc_remcast_current;
298 
299 	struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX];
300 
301 	int consensus_list_entries;
302 
303 	struct srp_addr my_id;
304 
305 	struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX];
306 
307 	struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX];
308 
309 	struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX];
310 
311 	struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX];
312 
313 	struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX];
314 
315 	struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX];
316 
317 	struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX];
318 
319 	unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX];
320 
321 	int my_proc_list_entries;
322 
323 	int my_failed_list_entries;
324 
325 	int my_new_memb_entries;
326 
327 	int my_trans_memb_entries;
328 
329 	int my_memb_entries;
330 
331 	int my_deliver_memb_entries;
332 
333 	int my_left_memb_entries;
334 
335 	int my_leave_memb_entries;
336 
337 	struct memb_ring_id my_ring_id;
338 
339 	struct memb_ring_id my_old_ring_id;
340 
341 	int my_aru_count;
342 
343 	int my_merge_detect_timeout_outstanding;
344 
345 	unsigned int my_last_aru;
346 
347 	int my_seq_unchanged;
348 
349 	int my_received_flg;
350 
351 	unsigned int my_high_seq_received;
352 
353 	unsigned int my_install_seq;
354 
355 	int my_rotation_counter;
356 
357 	int my_set_retrans_flg;
358 
359 	int my_retrans_flg_count;
360 
361 	unsigned int my_high_ring_delivered;
362 
363 	int heartbeat_timeout;
364 
365 	/*
366 	 * Queues used to order, deliver, and recover messages
367 	 */
368 	struct cs_queue new_message_queue;
369 
370 	struct cs_queue new_message_queue_trans;
371 
372 	struct cs_queue retrans_message_queue;
373 
374 	struct sq regular_sort_queue;
375 
376 	struct sq recovery_sort_queue;
377 
378 	/*
379 	 * Received up to and including
380 	 */
381 	unsigned int my_aru;
382 
383 	unsigned int my_high_delivered;
384 
385 	struct list_head token_callback_received_listhead;
386 
387 	struct list_head token_callback_sent_listhead;
388 
389 	char orf_token_retransmit[TOKEN_SIZE_MAX];
390 
391 	int orf_token_retransmit_size;
392 
393 	unsigned int my_token_seq;
394 
395 	/*
396 	 * Timers
397 	 */
398 	qb_loop_timer_handle timer_pause_timeout;
399 
400 	qb_loop_timer_handle timer_orf_token_timeout;
401 
402 	qb_loop_timer_handle timer_orf_token_retransmit_timeout;
403 
404 	qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout;
405 
406 	qb_loop_timer_handle timer_merge_detect_timeout;
407 
408 	qb_loop_timer_handle memb_timer_state_gather_join_timeout;
409 
410 	qb_loop_timer_handle memb_timer_state_gather_consensus_timeout;
411 
412 	qb_loop_timer_handle memb_timer_state_commit_timeout;
413 
414 	qb_loop_timer_handle timer_heartbeat_timeout;
415 
416 	/*
417 	 * Function and data used to log messages
418 	 */
419 	int totemsrp_log_level_security;
420 
421 	int totemsrp_log_level_error;
422 
423 	int totemsrp_log_level_warning;
424 
425 	int totemsrp_log_level_notice;
426 
427 	int totemsrp_log_level_debug;
428 
429 	int totemsrp_log_level_trace;
430 
431 	int totemsrp_subsys_id;
432 
433 	void (*totemsrp_log_printf) (
434 		int level,
435 		int subsys,
436 		const char *function,
437 		const char *file,
438 		int line,
439 		const char *format, ...)__attribute__((format(printf, 6, 7)));;
440 
441 	enum memb_state memb_state;
442 
443 //TODO	struct srp_addr next_memb;
444 
445 	qb_loop_t *totemsrp_poll_handle;
446 
447 	struct totem_ip_address mcast_address;
448 
449 	void (*totemsrp_deliver_fn) (
450 		unsigned int nodeid,
451 		const void *msg,
452 		unsigned int msg_len,
453 		int endian_conversion_required);
454 
455 	void (*totemsrp_confchg_fn) (
456 		enum totem_configuration_type configuration_type,
457 		const unsigned int *member_list, size_t member_list_entries,
458 		const unsigned int *left_list, size_t left_list_entries,
459 		const unsigned int *joined_list, size_t joined_list_entries,
460 		const struct memb_ring_id *ring_id);
461 
462         void (*totemsrp_service_ready_fn) (void);
463 
464 	void (*totemsrp_waiting_trans_ack_cb_fn) (
465 		int waiting_trans_ack);
466 
467 	void (*memb_ring_id_create_or_load) (
468 		struct memb_ring_id *memb_ring_id,
469 		const struct totem_ip_address *addr);
470 
471 	void (*memb_ring_id_store) (
472 		const struct memb_ring_id *memb_ring_id,
473 		const struct totem_ip_address *addr);
474 
475 	int global_seqno;
476 
477 	int my_token_held;
478 
479 	unsigned long long token_ring_id_seq;
480 
481 	unsigned int last_released;
482 
483 	unsigned int set_aru;
484 
485 	int old_ring_state_saved;
486 
487 	int old_ring_state_aru;
488 
489 	unsigned int old_ring_state_high_seq_received;
490 
491 	unsigned int my_last_seq;
492 
493 	struct timeval tv_old;
494 
495 	void *totemrrp_context;
496 
497 	struct totem_config *totem_config;
498 
499 	unsigned int use_heartbeat;
500 
501 	unsigned int my_trc;
502 
503 	unsigned int my_pbl;
504 
505 	unsigned int my_cbl;
506 
507 	uint64_t pause_timestamp;
508 
509 	struct memb_commit_token *commit_token;
510 
511 	totemsrp_stats_t stats;
512 
513 	uint32_t orf_token_discard;
514 
515 	uint32_t originated_orf_token;
516 
517 	uint32_t threaded_mode_enabled;
518 
519 	uint32_t waiting_trans_ack;
520 
521 	int 	flushing;
522 
523 	void * token_recv_event_handle;
524 	void * token_sent_event_handle;
525 	char commit_token_storage[40000];
526 };
527 
528 struct message_handlers {
529 	int count;
530 	int (*handler_functions[6]) (
531 		struct totemsrp_instance *instance,
532 		const void *msg,
533 		size_t msg_len,
534 		int endian_conversion_needed);
535 };
536 
537 enum gather_state_from {
538 	TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT = 0,
539 	TOTEMSRP_GSFROM_GATHER_MISSING1 = 1,
540 	TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE = 2,
541 	TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED = 3,
542 	TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE = 4,
543 	TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE = 5,
544 	TOTEMSRP_GSFROM_FAILED_TO_RECEIVE = 6,
545 	TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE = 7,
546 	TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE = 8,
547 	TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE = 9,
548 	TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE = 10,
549 	TOTEMSRP_GSFROM_MERGE_DURING_JOIN = 11,
550 	TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE = 12,
551 	TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE = 13,
552 	TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY = 14,
553 	TOTEMSRP_GSFROM_INTERFACE_CHANGE = 15,
554 	TOTEMSRP_GSFROM_MAX = TOTEMSRP_GSFROM_INTERFACE_CHANGE,
555 };
556 
557 const char* gather_state_from_desc [] = {
558 	[TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT] = "consensus timeout",
559 	[TOTEMSRP_GSFROM_GATHER_MISSING1] = "MISSING",
560 	[TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE] = "The token was lost in the OPERATIONAL state.",
561 	[TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED] = "The consensus timeout expired.",
562 	[TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE] = "The token was lost in the COMMIT state.",
563 	[TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE] = "The token was lost in the RECOVERY state.",
564 	[TOTEMSRP_GSFROM_FAILED_TO_RECEIVE] = "failed to receive",
565 	[TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE] = "foreign message in operational state",
566 	[TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE] = "foreign message in gather state",
567 	[TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE] = "merge during operational state",
568 	[TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE] = "merge during gather state",
569 	[TOTEMSRP_GSFROM_MERGE_DURING_JOIN] = "merge during join",
570 	[TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE] = "join during operational state",
571 	[TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE] = "join during commit state",
572 	[TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY] = "join during recovery",
573 	[TOTEMSRP_GSFROM_INTERFACE_CHANGE] = "interface change",
574 };
575 
576 /*
577  * forward decls
578  */
579 static int message_handler_orf_token (
580 	struct totemsrp_instance *instance,
581 	const void *msg,
582 	size_t msg_len,
583 	int endian_conversion_needed);
584 
585 static int message_handler_mcast (
586 	struct totemsrp_instance *instance,
587 	const void *msg,
588 	size_t msg_len,
589 	int endian_conversion_needed);
590 
591 static int message_handler_memb_merge_detect (
592 	struct totemsrp_instance *instance,
593 	const void *msg,
594 	size_t msg_len,
595 	int endian_conversion_needed);
596 
597 static int message_handler_memb_join (
598 	struct totemsrp_instance *instance,
599 	const void *msg,
600 	size_t msg_len,
601 	int endian_conversion_needed);
602 
603 static int message_handler_memb_commit_token (
604 	struct totemsrp_instance *instance,
605 	const void *msg,
606 	size_t msg_len,
607 	int endian_conversion_needed);
608 
609 static int message_handler_token_hold_cancel (
610 	struct totemsrp_instance *instance,
611 	const void *msg,
612 	size_t msg_len,
613 	int endian_conversion_needed);
614 
615 static void totemsrp_instance_initialize (struct totemsrp_instance *instance);
616 
617 static unsigned int main_msgs_missing (void);
618 
619 static void main_token_seqid_get (
620 	const void *msg,
621 	unsigned int *seqid,
622 	unsigned int *token_is);
623 
624 static void srp_addr_copy (struct srp_addr *dest, const struct srp_addr *src);
625 
626 static void srp_addr_to_nodeid (
627 	unsigned int *nodeid_out,
628 	struct srp_addr *srp_addr_in,
629 	unsigned int entries);
630 
631 static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b);
632 
633 static void memb_leave_message_send (struct totemsrp_instance *instance);
634 
635 static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type);
636 static void memb_state_gather_enter (struct totemsrp_instance *instance, enum gather_state_from gather_from);
637 static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, unsigned int end_point);
638 static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token *oken,
639 	int fcc_mcasts_allowed);
640 static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru);
641 
642 static void memb_ring_id_set (struct totemsrp_instance *instance,
643 	const struct memb_ring_id *ring_id);
644 static void target_set_completed (void *context);
645 static void memb_state_commit_token_update (struct totemsrp_instance *instance);
646 static void memb_state_commit_token_target_set (struct totemsrp_instance *instance);
647 static int memb_state_commit_token_send (struct totemsrp_instance *instance);
648 static int memb_state_commit_token_send_recovery (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
649 static void memb_state_commit_token_create (struct totemsrp_instance *instance);
650 static int token_hold_cancel_send (struct totemsrp_instance *instance);
651 static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out);
652 static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out);
653 static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out);
654 static void mcast_endian_convert (const struct mcast *in, struct mcast *out);
655 static void memb_merge_detect_endian_convert (
656 	const struct memb_merge_detect *in,
657 	struct memb_merge_detect *out);
658 static void srp_addr_copy_endian_convert (struct srp_addr *out, const struct srp_addr *in);
659 static void timer_function_orf_token_timeout (void *data);
660 static void timer_function_pause_timeout (void *data);
661 static void timer_function_heartbeat_timeout (void *data);
662 static void timer_function_token_retransmit_timeout (void *data);
663 static void timer_function_token_hold_retransmit_timeout (void *data);
664 static void timer_function_merge_detect_timeout (void *data);
665 static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance);
666 static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr);
667 static const char* gsfrom_to_msg(enum gather_state_from gsfrom);
668 
669 void main_deliver_fn (
670 	void *context,
671 	const void *msg,
672 	unsigned int msg_len);
673 
674 void main_iface_change_fn (
675 	void *context,
676 	const struct totem_ip_address *iface_address,
677 	unsigned int iface_no);
678 
679 struct message_handlers totemsrp_message_handlers = {
680 	6,
681 	{
682 		message_handler_orf_token,            /* MESSAGE_TYPE_ORF_TOKEN */
683 		message_handler_mcast,                /* MESSAGE_TYPE_MCAST */
684 		message_handler_memb_merge_detect,    /* MESSAGE_TYPE_MEMB_MERGE_DETECT */
685 		message_handler_memb_join,            /* MESSAGE_TYPE_MEMB_JOIN */
686 		message_handler_memb_commit_token,    /* MESSAGE_TYPE_MEMB_COMMIT_TOKEN */
687 		message_handler_token_hold_cancel     /* MESSAGE_TYPE_TOKEN_HOLD_CANCEL */
688 	}
689 };
690 
691 #define log_printf(level, format, args...)		\
692 do {							\
693 	instance->totemsrp_log_printf (			\
694 		level, instance->totemsrp_subsys_id,	\
695 		__FUNCTION__, __FILE__, __LINE__,	\
696 		format, ##args);			\
697 } while (0);
698 #define LOGSYS_PERROR(err_num, level, fmt, args...)						\
699 do {												\
700 	char _error_str[LOGSYS_MAX_PERROR_MSG_LEN];						\
701 	const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str));	\
702         instance->totemsrp_log_printf (								\
703 		level, instance->totemsrp_subsys_id,						\
704                 __FUNCTION__, __FILE__, __LINE__,						\
705 		fmt ": %s (%d)\n", ##args, _error_ptr, err_num);				\
706 	} while(0)
707 
gsfrom_to_msg(enum gather_state_from gsfrom)708 static const char* gsfrom_to_msg(enum gather_state_from gsfrom)
709 {
710 	if (gsfrom <= TOTEMSRP_GSFROM_MAX) {
711 		return gather_state_from_desc[gsfrom];
712 	}
713 	else {
714 		return "UNKNOWN";
715 	}
716 }
717 
totemsrp_instance_initialize(struct totemsrp_instance * instance)718 static void totemsrp_instance_initialize (struct totemsrp_instance *instance)
719 {
720 	memset (instance, 0, sizeof (struct totemsrp_instance));
721 
722 	list_init (&instance->token_callback_received_listhead);
723 
724 	list_init (&instance->token_callback_sent_listhead);
725 
726 	instance->my_received_flg = 1;
727 
728 	instance->my_token_seq = SEQNO_START_TOKEN - 1;
729 
730 	instance->memb_state = MEMB_STATE_OPERATIONAL;
731 
732 	instance->set_aru = -1;
733 
734 	instance->my_aru = SEQNO_START_MSG;
735 
736 	instance->my_high_seq_received = SEQNO_START_MSG;
737 
738 	instance->my_high_delivered = SEQNO_START_MSG;
739 
740 	instance->orf_token_discard = 0;
741 
742 	instance->originated_orf_token = 0;
743 
744 	instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage;
745 
746 	instance->my_id.no_addrs = INTERFACE_MAX;
747 
748 	instance->waiting_trans_ack = 1;
749 }
750 
main_token_seqid_get(const void * msg,unsigned int * seqid,unsigned int * token_is)751 static void main_token_seqid_get (
752 	const void *msg,
753 	unsigned int *seqid,
754 	unsigned int *token_is)
755 {
756 	const struct orf_token *token = msg;
757 
758 	*seqid = 0;
759 	*token_is = 0;
760 	if (token->header.type == MESSAGE_TYPE_ORF_TOKEN) {
761 		*seqid = token->token_seq;
762 		*token_is = 1;
763 	}
764 }
765 
main_msgs_missing(void)766 static unsigned int main_msgs_missing (void)
767 {
768 // TODO
769 	return (0);
770 }
771 
pause_flush(struct totemsrp_instance * instance)772 static int pause_flush (struct totemsrp_instance *instance)
773 {
774 	uint64_t now_msec;
775 	uint64_t timestamp_msec;
776 	int res = 0;
777 
778         now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC);
779         timestamp_msec = instance->pause_timestamp / QB_TIME_NS_IN_MSEC;
780 
781 	if ((now_msec - timestamp_msec) > (instance->totem_config->token_timeout / 2)) {
782 		log_printf (instance->totemsrp_log_level_notice,
783 			"Process pause detected for %d ms, flushing membership messages.", (unsigned int)(now_msec - timestamp_msec));
784 		/*
785 		 * -1 indicates an error from recvmsg
786 		 */
787 		do {
788 			res = totemrrp_mcast_recv_empty (instance->totemrrp_context);
789 		} while (res == -1);
790 	}
791 	return (res);
792 }
793 
token_event_stats_collector(enum totem_callback_token_type type,const void * void_instance)794 static int token_event_stats_collector (enum totem_callback_token_type type, const void *void_instance)
795 {
796 	struct totemsrp_instance *instance = (struct totemsrp_instance *)void_instance;
797 	uint32_t time_now;
798 	unsigned long long nano_secs = qb_util_nano_current_get ();
799 
800 	time_now = (nano_secs / QB_TIME_NS_IN_MSEC);
801 
802 	if (type == TOTEM_CALLBACK_TOKEN_RECEIVED) {
803 		/* incr latest token the index */
804 		if (instance->stats.latest_token == (TOTEM_TOKEN_STATS_MAX - 1))
805 			instance->stats.latest_token = 0;
806 		else
807 			instance->stats.latest_token++;
808 
809 		if (instance->stats.earliest_token == instance->stats.latest_token) {
810 			/* we have filled up the array, start overwriting */
811 			if (instance->stats.earliest_token == (TOTEM_TOKEN_STATS_MAX - 1))
812 				instance->stats.earliest_token = 0;
813 			else
814 				instance->stats.earliest_token++;
815 
816 			instance->stats.token[instance->stats.earliest_token].rx = 0;
817 			instance->stats.token[instance->stats.earliest_token].tx = 0;
818 			instance->stats.token[instance->stats.earliest_token].backlog_calc = 0;
819 		}
820 
821 		instance->stats.token[instance->stats.latest_token].rx = time_now;
822 		instance->stats.token[instance->stats.latest_token].tx = 0; /* in case we drop the token */
823 	} else {
824 		instance->stats.token[instance->stats.latest_token].tx = time_now;
825 	}
826 	return 0;
827 }
828 
829 /*
830  * Exported interfaces
831  */
totemsrp_initialize(qb_loop_t * poll_handle,void ** srp_context,struct totem_config * totem_config,totemmrp_stats_t * stats,void (* deliver_fn)(unsigned int nodeid,const void * msg,unsigned int msg_len,int endian_conversion_required),void (* confchg_fn)(enum totem_configuration_type configuration_type,const unsigned int * member_list,size_t member_list_entries,const unsigned int * left_list,size_t left_list_entries,const unsigned int * joined_list,size_t joined_list_entries,const struct memb_ring_id * ring_id),void (* waiting_trans_ack_cb_fn)(int waiting_trans_ack))832 int totemsrp_initialize (
833 	qb_loop_t *poll_handle,
834 	void **srp_context,
835 	struct totem_config *totem_config,
836 	totemmrp_stats_t *stats,
837 
838 	void (*deliver_fn) (
839 		unsigned int nodeid,
840 		const void *msg,
841 		unsigned int msg_len,
842 		int endian_conversion_required),
843 
844 	void (*confchg_fn) (
845 		enum totem_configuration_type configuration_type,
846 		const unsigned int *member_list, size_t member_list_entries,
847 		const unsigned int *left_list, size_t left_list_entries,
848 		const unsigned int *joined_list, size_t joined_list_entries,
849 		const struct memb_ring_id *ring_id),
850 	void (*waiting_trans_ack_cb_fn) (
851 		int waiting_trans_ack))
852 {
853 	struct totemsrp_instance *instance;
854 	int res;
855 
856 	instance = malloc (sizeof (struct totemsrp_instance));
857 	if (instance == NULL) {
858 		goto error_exit;
859 	}
860 
861 	totemsrp_instance_initialize (instance);
862 
863 	instance->totemsrp_waiting_trans_ack_cb_fn = waiting_trans_ack_cb_fn;
864 	instance->totemsrp_waiting_trans_ack_cb_fn (1);
865 
866 	stats->srp = &instance->stats;
867 	instance->stats.latest_token = 0;
868 	instance->stats.earliest_token = 0;
869 
870 	instance->totem_config = totem_config;
871 
872 	/*
873 	 * Configure logging
874 	 */
875 	instance->totemsrp_log_level_security = totem_config->totem_logging_configuration.log_level_security;
876 	instance->totemsrp_log_level_error = totem_config->totem_logging_configuration.log_level_error;
877 	instance->totemsrp_log_level_warning = totem_config->totem_logging_configuration.log_level_warning;
878 	instance->totemsrp_log_level_notice = totem_config->totem_logging_configuration.log_level_notice;
879 	instance->totemsrp_log_level_debug = totem_config->totem_logging_configuration.log_level_debug;
880 	instance->totemsrp_log_level_trace = totem_config->totem_logging_configuration.log_level_trace;
881 	instance->totemsrp_subsys_id = totem_config->totem_logging_configuration.log_subsys_id;
882 	instance->totemsrp_log_printf = totem_config->totem_logging_configuration.log_printf;
883 
884 	/*
885 	 * Configure totem store and load functions
886 	 */
887 	instance->memb_ring_id_create_or_load = totem_config->totem_memb_ring_id_create_or_load;
888 	instance->memb_ring_id_store = totem_config->totem_memb_ring_id_store;
889 
890 	/*
891 	 * Initialize local variables for totemsrp
892 	 */
893 	totemip_copy (&instance->mcast_address, &totem_config->interfaces[0].mcast_addr);
894 
895 	/*
896 	 * Display totem configuration
897 	 */
898 	log_printf (instance->totemsrp_log_level_debug,
899 		"Token Timeout (%d ms) retransmit timeout (%d ms)",
900 		totem_config->token_timeout, totem_config->token_retransmit_timeout);
901 	log_printf (instance->totemsrp_log_level_debug,
902 		"token hold (%d ms) retransmits before loss (%d retrans)",
903 		totem_config->token_hold_timeout, totem_config->token_retransmits_before_loss_const);
904 	log_printf (instance->totemsrp_log_level_debug,
905 		"join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
906 		totem_config->join_timeout,
907 		totem_config->send_join_timeout,
908 		totem_config->consensus_timeout,
909 
910 		totem_config->merge_timeout);
911 	log_printf (instance->totemsrp_log_level_debug,
912 		"downcheck (%d ms) fail to recv const (%d msgs)",
913 		totem_config->downcheck_timeout, totem_config->fail_to_recv_const);
914 	log_printf (instance->totemsrp_log_level_debug,
915 		"seqno unchanged const (%d rotations) Maximum network MTU %d", totem_config->seqno_unchanged_const, totem_config->net_mtu);
916 
917 	log_printf (instance->totemsrp_log_level_debug,
918 		"window size per rotation (%d messages) maximum messages per rotation (%d messages)",
919 		totem_config->window_size, totem_config->max_messages);
920 
921 	log_printf (instance->totemsrp_log_level_debug,
922 		"missed count const (%d messages)",
923 		totem_config->miss_count_const);
924 
925 	log_printf (instance->totemsrp_log_level_debug,
926 		"send threads (%d threads)", totem_config->threads);
927 	log_printf (instance->totemsrp_log_level_debug,
928 		"RRP token expired timeout (%d ms)",
929 		totem_config->rrp_token_expired_timeout);
930 	log_printf (instance->totemsrp_log_level_debug,
931 		"RRP token problem counter (%d ms)",
932 		totem_config->rrp_problem_count_timeout);
933 	log_printf (instance->totemsrp_log_level_debug,
934 		"RRP threshold (%d problem count)",
935 		totem_config->rrp_problem_count_threshold);
936 	log_printf (instance->totemsrp_log_level_debug,
937 		"RRP multicast threshold (%d problem count)",
938 		totem_config->rrp_problem_count_mcast_threshold);
939 	log_printf (instance->totemsrp_log_level_debug,
940 		"RRP automatic recovery check timeout (%d ms)",
941 		totem_config->rrp_autorecovery_check_timeout);
942 	log_printf (instance->totemsrp_log_level_debug,
943 		"RRP mode set to %s.", instance->totem_config->rrp_mode);
944 
945 	log_printf (instance->totemsrp_log_level_debug,
946 		"heartbeat_failures_allowed (%d)", totem_config->heartbeat_failures_allowed);
947 	log_printf (instance->totemsrp_log_level_debug,
948 		"max_network_delay (%d ms)", totem_config->max_network_delay);
949 
950 
951 	cs_queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX,
952 		sizeof (struct message_item), instance->threaded_mode_enabled);
953 
954 	sq_init (&instance->regular_sort_queue,
955 		QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
956 
957 	sq_init (&instance->recovery_sort_queue,
958 		QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
959 
960 	instance->totemsrp_poll_handle = poll_handle;
961 
962 	instance->totemsrp_deliver_fn = deliver_fn;
963 
964 	instance->totemsrp_confchg_fn = confchg_fn;
965 	instance->use_heartbeat = 1;
966 
967 	timer_function_pause_timeout (instance);
968 
969 	if ( totem_config->heartbeat_failures_allowed == 0 ) {
970 		log_printf (instance->totemsrp_log_level_debug,
971 			"HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
972 		instance->use_heartbeat = 0;
973 	}
974 
975 	if (instance->use_heartbeat) {
976 		instance->heartbeat_timeout
977 			= (totem_config->heartbeat_failures_allowed) * totem_config->token_retransmit_timeout
978 				+ totem_config->max_network_delay;
979 
980 		if (instance->heartbeat_timeout >= totem_config->token_timeout) {
981 			log_printf (instance->totemsrp_log_level_debug,
982 				"total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
983 				instance->heartbeat_timeout,
984 				totem_config->token_timeout);
985 			log_printf (instance->totemsrp_log_level_debug,
986 				"heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
987 			log_printf (instance->totemsrp_log_level_debug,
988 				"heartbeat timeout should be less than the token timeout. Heartbeat is disabled!!");
989 			instance->use_heartbeat = 0;
990 		}
991 		else {
992 			log_printf (instance->totemsrp_log_level_debug,
993 				"total heartbeat_timeout (%d ms)", instance->heartbeat_timeout);
994 		}
995 	}
996 
997 	res = totemrrp_initialize (
998 		poll_handle,
999 		&instance->totemrrp_context,
1000 		totem_config,
1001 		stats->srp,
1002 		instance,
1003 		main_deliver_fn,
1004 		main_iface_change_fn,
1005 		main_token_seqid_get,
1006 		main_msgs_missing,
1007 		target_set_completed);
1008 	if (res == -1) {
1009 		goto error_exit;
1010 	}
1011 
1012 	/*
1013 	 * Must have net_mtu adjusted by totemrrp_initialize first
1014 	 */
1015 	cs_queue_init (&instance->new_message_queue,
1016 		MESSAGE_QUEUE_MAX,
1017 		sizeof (struct message_item), instance->threaded_mode_enabled);
1018 
1019 	cs_queue_init (&instance->new_message_queue_trans,
1020 		MESSAGE_QUEUE_MAX,
1021 		sizeof (struct message_item), instance->threaded_mode_enabled);
1022 
1023 	totemsrp_callback_token_create (instance,
1024 		&instance->token_recv_event_handle,
1025 		TOTEM_CALLBACK_TOKEN_RECEIVED,
1026 		0,
1027 		token_event_stats_collector,
1028 		instance);
1029 	totemsrp_callback_token_create (instance,
1030 		&instance->token_sent_event_handle,
1031 		TOTEM_CALLBACK_TOKEN_SENT,
1032 		0,
1033 		token_event_stats_collector,
1034 		instance);
1035 	*srp_context = instance;
1036 	return (0);
1037 
1038 error_exit:
1039 	return (-1);
1040 }
1041 
totemsrp_finalize(void * srp_context)1042 void totemsrp_finalize (
1043 	void *srp_context)
1044 {
1045 	struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1046 
1047 
1048 	memb_leave_message_send (instance);
1049 	totemrrp_finalize (instance->totemrrp_context);
1050 	cs_queue_free (&instance->new_message_queue);
1051 	cs_queue_free (&instance->new_message_queue_trans);
1052 	cs_queue_free (&instance->retrans_message_queue);
1053 	sq_free (&instance->regular_sort_queue);
1054 	sq_free (&instance->recovery_sort_queue);
1055 	free (instance);
1056 }
1057 
1058 /*
1059  * Return configured interfaces. interfaces is array of totem_ip addresses allocated by caller,
1060  * with interaces_size number of items. iface_count is final number of interfaces filled by this
1061  * function.
1062  *
1063  * Function returns 0 on success, otherwise if interfaces array is not big enough, -2 is returned,
1064  * and if interface was not found, -1 is returned.
1065  */
totemsrp_ifaces_get(void * srp_context,unsigned int nodeid,struct totem_ip_address * interfaces,unsigned int interfaces_size,char *** status,unsigned int * iface_count)1066 int totemsrp_ifaces_get (
1067 	void *srp_context,
1068 	unsigned int nodeid,
1069 	struct totem_ip_address *interfaces,
1070 	unsigned int interfaces_size,
1071 	char ***status,
1072 	unsigned int *iface_count)
1073 {
1074 	struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1075 	int res = 0;
1076 	unsigned int found = 0;
1077 	unsigned int i;
1078 
1079 	for (i = 0; i < instance->my_memb_entries; i++) {
1080 		if (instance->my_memb_list[i].addr[0].nodeid == nodeid) {
1081 			found = 1;
1082 			break;
1083 		}
1084 	}
1085 
1086 	if (found) {
1087 		*iface_count = instance->totem_config->interface_count;
1088 
1089 		if (interfaces_size >= *iface_count) {
1090 			memcpy (interfaces, instance->my_memb_list[i].addr,
1091 				sizeof (struct totem_ip_address) * *iface_count);
1092 		} else {
1093 			res = -2;
1094 		}
1095 
1096 		goto finish;
1097 	}
1098 
1099 	for (i = 0; i < instance->my_left_memb_entries; i++) {
1100 		if (instance->my_left_memb_list[i].addr[0].nodeid == nodeid) {
1101 			found = 1;
1102 			break;
1103 		}
1104 	}
1105 
1106 	if (found) {
1107 		*iface_count = instance->totem_config->interface_count;
1108 
1109 		if (interfaces_size >= *iface_count) {
1110 			memcpy (interfaces, instance->my_left_memb_list[i].addr,
1111 				sizeof (struct totem_ip_address) * *iface_count);
1112 		} else {
1113 			res = -2;
1114 		}
1115 	} else {
1116 		res = -1;
1117 	}
1118 
1119 finish:
1120 	totemrrp_ifaces_get (instance->totemrrp_context, status, NULL);
1121 	return (res);
1122 }
1123 
totemsrp_crypto_set(void * srp_context,const char * cipher_type,const char * hash_type)1124 int totemsrp_crypto_set (
1125 	void *srp_context,
1126 	const char *cipher_type,
1127 	const char *hash_type)
1128 {
1129 	struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1130 	int res;
1131 
1132 	res = totemrrp_crypto_set(instance->totemrrp_context, cipher_type, hash_type);
1133 
1134 	return (res);
1135 }
1136 
1137 
totemsrp_my_nodeid_get(void * srp_context)1138 unsigned int totemsrp_my_nodeid_get (
1139 	void *srp_context)
1140 {
1141 	struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1142 	unsigned int res;
1143 
1144 	res = instance->totem_config->interfaces[0].boundto.nodeid;
1145 
1146 	return (res);
1147 }
1148 
totemsrp_my_family_get(void * srp_context)1149 int totemsrp_my_family_get (
1150 	void *srp_context)
1151 {
1152 	struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1153 	int res;
1154 
1155 	res = instance->totem_config->interfaces[0].boundto.family;
1156 
1157 	return (res);
1158 }
1159 
1160 
totemsrp_ring_reenable(void * srp_context)1161 int totemsrp_ring_reenable (
1162         void *srp_context)
1163 {
1164 	struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1165 
1166 	totemrrp_ring_reenable (instance->totemrrp_context,
1167 		instance->totem_config->interface_count);
1168 
1169 	return (0);
1170 }
1171 
1172 
1173 /*
1174  * Set operations for use by the membership algorithm
1175  */
srp_addr_equal(const struct srp_addr * a,const struct srp_addr * b)1176 static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b)
1177 {
1178 	unsigned int i;
1179 	unsigned int res;
1180 
1181 	for (i = 0; i < 1; i++) {
1182 		res = totemip_equal (&a->addr[i], &b->addr[i]);
1183 		if (res == 0) {
1184 			return (0);
1185 		}
1186 	}
1187 	return (1);
1188 }
1189 
srp_addr_copy(struct srp_addr * dest,const struct srp_addr * src)1190 static void srp_addr_copy (struct srp_addr *dest, const struct srp_addr *src)
1191 {
1192 	unsigned int i;
1193 
1194 	dest->no_addrs = src->no_addrs;
1195 
1196 	for (i = 0; i < INTERFACE_MAX; i++) {
1197 		totemip_copy (&dest->addr[i], &src->addr[i]);
1198 	}
1199 }
1200 
srp_addr_to_nodeid(unsigned int * nodeid_out,struct srp_addr * srp_addr_in,unsigned int entries)1201 static void srp_addr_to_nodeid (
1202 	unsigned int *nodeid_out,
1203 	struct srp_addr *srp_addr_in,
1204 	unsigned int entries)
1205 {
1206 	unsigned int i;
1207 
1208 	for (i = 0; i < entries; i++) {
1209 		nodeid_out[i] = srp_addr_in[i].addr[0].nodeid;
1210 	}
1211 }
1212 
srp_addr_copy_endian_convert(struct srp_addr * out,const struct srp_addr * in)1213 static void srp_addr_copy_endian_convert (struct srp_addr *out, const struct srp_addr *in)
1214 {
1215 	int i;
1216 
1217 	for (i = 0; i < INTERFACE_MAX; i++) {
1218 		totemip_copy_endian_convert (&out->addr[i], &in->addr[i]);
1219 	}
1220 }
1221 
memb_consensus_reset(struct totemsrp_instance * instance)1222 static void memb_consensus_reset (struct totemsrp_instance *instance)
1223 {
1224 	instance->consensus_list_entries = 0;
1225 }
1226 
memb_set_subtract(struct srp_addr * out_list,int * out_list_entries,struct srp_addr * one_list,int one_list_entries,struct srp_addr * two_list,int two_list_entries)1227 static void memb_set_subtract (
1228         struct srp_addr *out_list, int *out_list_entries,
1229         struct srp_addr *one_list, int one_list_entries,
1230         struct srp_addr *two_list, int two_list_entries)
1231 {
1232 	int found = 0;
1233 	int i;
1234 	int j;
1235 
1236 	*out_list_entries = 0;
1237 
1238 	for (i = 0; i < one_list_entries; i++) {
1239 		for (j = 0; j < two_list_entries; j++) {
1240 			if (srp_addr_equal (&one_list[i], &two_list[j])) {
1241 				found = 1;
1242 				break;
1243 			}
1244 		}
1245 		if (found == 0) {
1246 			srp_addr_copy (&out_list[*out_list_entries], &one_list[i]);
1247 			*out_list_entries = *out_list_entries + 1;
1248 		}
1249 		found = 0;
1250 	}
1251 }
1252 
1253 /*
1254  * Set consensus for a specific processor
1255  */
memb_consensus_set(struct totemsrp_instance * instance,const struct srp_addr * addr)1256 static void memb_consensus_set (
1257 	struct totemsrp_instance *instance,
1258 	const struct srp_addr *addr)
1259 {
1260 	int found = 0;
1261 	int i;
1262 
1263 	if (addr->addr[0].nodeid == LEAVE_DUMMY_NODEID)
1264 	        return;
1265 
1266 	for (i = 0; i < instance->consensus_list_entries; i++) {
1267 		if (srp_addr_equal(addr, &instance->consensus_list[i].addr)) {
1268 			found = 1;
1269 			break; /* found entry */
1270 		}
1271 	}
1272 	srp_addr_copy (&instance->consensus_list[i].addr, addr);
1273 	instance->consensus_list[i].set = 1;
1274 	if (found == 0) {
1275 		instance->consensus_list_entries++;
1276 	}
1277 	return;
1278 }
1279 
1280 /*
1281  * Is consensus set for a specific processor
1282  */
memb_consensus_isset(struct totemsrp_instance * instance,const struct srp_addr * addr)1283 static int memb_consensus_isset (
1284 	struct totemsrp_instance *instance,
1285 	const struct srp_addr *addr)
1286 {
1287 	int i;
1288 
1289 	for (i = 0; i < instance->consensus_list_entries; i++) {
1290 		if (srp_addr_equal (addr, &instance->consensus_list[i].addr)) {
1291 			return (instance->consensus_list[i].set);
1292 		}
1293 	}
1294 	return (0);
1295 }
1296 
1297 /*
1298  * Is consensus agreed upon based upon consensus database
1299  */
memb_consensus_agreed(struct totemsrp_instance * instance)1300 static int memb_consensus_agreed (
1301 	struct totemsrp_instance *instance)
1302 {
1303 	struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
1304 	int token_memb_entries = 0;
1305 	int agreed = 1;
1306 	int i;
1307 
1308 	memb_set_subtract (token_memb, &token_memb_entries,
1309 		instance->my_proc_list, instance->my_proc_list_entries,
1310 		instance->my_failed_list, instance->my_failed_list_entries);
1311 
1312 	for (i = 0; i < token_memb_entries; i++) {
1313 		if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1314 			agreed = 0;
1315 			break;
1316 		}
1317 	}
1318 
1319 	if (agreed && instance->failed_to_recv == 1) {
1320 		/*
1321 		 * Both nodes agreed on our failure. We don't care how many proc list items left because we
1322 		 * will create single ring anyway.
1323 		 */
1324 
1325 		 return (agreed);
1326 	}
1327 
1328 	assert (token_memb_entries >= 1);
1329 
1330 	return (agreed);
1331 }
1332 
memb_consensus_notset(struct totemsrp_instance * instance,struct srp_addr * no_consensus_list,int * no_consensus_list_entries,struct srp_addr * comparison_list,int comparison_list_entries)1333 static void memb_consensus_notset (
1334 	struct totemsrp_instance *instance,
1335 	struct srp_addr *no_consensus_list,
1336 	int *no_consensus_list_entries,
1337 	struct srp_addr *comparison_list,
1338 	int comparison_list_entries)
1339 {
1340 	int i;
1341 
1342 	*no_consensus_list_entries = 0;
1343 
1344 	for (i = 0; i < instance->my_proc_list_entries; i++) {
1345 		if (memb_consensus_isset (instance, &instance->my_proc_list[i]) == 0) {
1346 			srp_addr_copy (&no_consensus_list[*no_consensus_list_entries], &instance->my_proc_list[i]);
1347 			*no_consensus_list_entries = *no_consensus_list_entries + 1;
1348 		}
1349 	}
1350 }
1351 
1352 /*
1353  * Is set1 equal to set2 Entries can be in different orders
1354  */
memb_set_equal(struct srp_addr * set1,int set1_entries,struct srp_addr * set2,int set2_entries)1355 static int memb_set_equal (
1356 	struct srp_addr *set1, int set1_entries,
1357 	struct srp_addr *set2, int set2_entries)
1358 {
1359 	int i;
1360 	int j;
1361 
1362 	int found = 0;
1363 
1364 	if (set1_entries != set2_entries) {
1365 		return (0);
1366 	}
1367 	for (i = 0; i < set2_entries; i++) {
1368 		for (j = 0; j < set1_entries; j++) {
1369 			if (srp_addr_equal (&set1[j], &set2[i])) {
1370 				found = 1;
1371 				break;
1372 			}
1373 		}
1374 		if (found == 0) {
1375 			return (0);
1376 		}
1377 		found = 0;
1378 	}
1379 	return (1);
1380 }
1381 
1382 /*
1383  * Is subset fully contained in fullset
1384  */
memb_set_subset(const struct srp_addr * subset,int subset_entries,const struct srp_addr * fullset,int fullset_entries)1385 static int memb_set_subset (
1386 	const struct srp_addr *subset, int subset_entries,
1387 	const struct srp_addr *fullset, int fullset_entries)
1388 {
1389 	int i;
1390 	int j;
1391 	int found = 0;
1392 
1393 	if (subset_entries > fullset_entries) {
1394 		return (0);
1395 	}
1396 	for (i = 0; i < subset_entries; i++) {
1397 		for (j = 0; j < fullset_entries; j++) {
1398 			if (srp_addr_equal (&subset[i], &fullset[j])) {
1399 				found = 1;
1400 			}
1401 		}
1402 		if (found == 0) {
1403 			return (0);
1404 		}
1405 		found = 0;
1406 	}
1407 	return (1);
1408 }
1409 /*
1410  * merge subset into fullset taking care not to add duplicates
1411  */
memb_set_merge(const struct srp_addr * subset,int subset_entries,struct srp_addr * fullset,int * fullset_entries)1412 static void memb_set_merge (
1413 	const struct srp_addr *subset, int subset_entries,
1414 	struct srp_addr *fullset, int *fullset_entries)
1415 {
1416 	int found = 0;
1417 	int i;
1418 	int j;
1419 
1420 	for (i = 0; i < subset_entries; i++) {
1421 		for (j = 0; j < *fullset_entries; j++) {
1422 			if (srp_addr_equal (&fullset[j], &subset[i])) {
1423 				found = 1;
1424 				break;
1425 			}
1426 		}
1427 		if (found == 0) {
1428 			srp_addr_copy (&fullset[*fullset_entries], &subset[i]);
1429 			*fullset_entries = *fullset_entries + 1;
1430 		}
1431 		found = 0;
1432 	}
1433 	return;
1434 }
1435 
memb_set_and_with_ring_id(struct srp_addr * set1,struct memb_ring_id * set1_ring_ids,int set1_entries,struct srp_addr * set2,int set2_entries,struct memb_ring_id * old_ring_id,struct srp_addr * and,int * and_entries)1436 static void memb_set_and_with_ring_id (
1437 	struct srp_addr *set1,
1438 	struct memb_ring_id *set1_ring_ids,
1439 	int set1_entries,
1440 	struct srp_addr *set2,
1441 	int set2_entries,
1442 	struct memb_ring_id *old_ring_id,
1443 	struct srp_addr *and,
1444 	int *and_entries)
1445 {
1446 	int i;
1447 	int j;
1448 	int found = 0;
1449 
1450 	*and_entries = 0;
1451 
1452 	for (i = 0; i < set2_entries; i++) {
1453 		for (j = 0; j < set1_entries; j++) {
1454 			if (srp_addr_equal (&set1[j], &set2[i])) {
1455 				if (memcmp (&set1_ring_ids[j], old_ring_id, sizeof (struct memb_ring_id)) == 0) {
1456 					found = 1;
1457 				}
1458 				break;
1459 			}
1460 		}
1461 		if (found) {
1462 			srp_addr_copy (&and[*and_entries], &set1[j]);
1463 			*and_entries = *and_entries + 1;
1464 		}
1465 		found = 0;
1466 	}
1467 	return;
1468 }
1469 
1470 #ifdef CODE_COVERAGE
memb_set_print(char * string,struct srp_addr * list,int list_entries)1471 static void memb_set_print (
1472 	char *string,
1473         struct srp_addr *list,
1474 	int list_entries)
1475 {
1476 	int i;
1477 	int j;
1478 	printf ("List '%s' contains %d entries:\n", string, list_entries);
1479 
1480 	for (i = 0; i < list_entries; i++) {
1481 		printf ("Address %d with %d rings\n", i, list[i].no_addrs);
1482 		for (j = 0; j < list[i].no_addrs; j++) {
1483 			printf ("\tiface %d %s\n", j, totemip_print (&list[i].addr[j]));
1484 			printf ("\tfamily %d\n", list[i].addr[j].family);
1485 		}
1486 	}
1487 }
1488 #endif
my_leave_memb_clear(struct totemsrp_instance * instance)1489 static void my_leave_memb_clear(
1490         struct totemsrp_instance *instance)
1491 {
1492         memset(instance->my_leave_memb_list, 0, sizeof(instance->my_leave_memb_list));
1493         instance->my_leave_memb_entries = 0;
1494 }
1495 
my_leave_memb_match(struct totemsrp_instance * instance,unsigned int nodeid)1496 static unsigned int my_leave_memb_match(
1497         struct totemsrp_instance *instance,
1498         unsigned int nodeid)
1499 {
1500         int i;
1501         unsigned int ret = 0;
1502 
1503         for (i = 0; i < instance->my_leave_memb_entries; i++){
1504                 if (instance->my_leave_memb_list[i] ==  nodeid){
1505                         ret = nodeid;
1506                         break;
1507                 }
1508         }
1509         return ret;
1510 }
1511 
my_leave_memb_set(struct totemsrp_instance * instance,unsigned int nodeid)1512 static void my_leave_memb_set(
1513         struct totemsrp_instance *instance,
1514         unsigned int nodeid)
1515 {
1516         int i, found = 0;
1517         for (i = 0; i < instance->my_leave_memb_entries; i++){
1518                 if (instance->my_leave_memb_list[i] ==  nodeid){
1519                         found = 1;
1520                         break;
1521                 }
1522         }
1523         if (found == 1) {
1524                 return;
1525         }
1526         if (instance->my_leave_memb_entries < (PROCESSOR_COUNT_MAX - 1)) {
1527                 instance->my_leave_memb_list[instance->my_leave_memb_entries] = nodeid;
1528                 instance->my_leave_memb_entries++;
1529         } else {
1530                 log_printf (instance->totemsrp_log_level_warning,
1531                         "Cannot set LEAVE nodeid=%d", nodeid);
1532         }
1533 }
1534 
1535 
totemsrp_buffer_alloc(struct totemsrp_instance * instance)1536 static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance)
1537 {
1538 	assert (instance != NULL);
1539 	return totemrrp_buffer_alloc (instance->totemrrp_context);
1540 }
1541 
totemsrp_buffer_release(struct totemsrp_instance * instance,void * ptr)1542 static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr)
1543 {
1544 	assert (instance != NULL);
1545 	totemrrp_buffer_release (instance->totemrrp_context, ptr);
1546 }
1547 
reset_token_retransmit_timeout(struct totemsrp_instance * instance)1548 static void reset_token_retransmit_timeout (struct totemsrp_instance *instance)
1549 {
1550 	int32_t res;
1551 
1552 	qb_loop_timer_del (instance->totemsrp_poll_handle,
1553 		instance->timer_orf_token_retransmit_timeout);
1554 	res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1555 		QB_LOOP_MED,
1556 		instance->totem_config->token_retransmit_timeout*QB_TIME_NS_IN_MSEC,
1557 		(void *)instance,
1558 		timer_function_token_retransmit_timeout,
1559 		&instance->timer_orf_token_retransmit_timeout);
1560 	if (res != 0) {
1561 		log_printf(instance->totemsrp_log_level_error, "reset_token_retransmit_timeout - qb_loop_timer_add error : %d", res);
1562 	}
1563 
1564 }
1565 
start_merge_detect_timeout(struct totemsrp_instance * instance)1566 static void start_merge_detect_timeout (struct totemsrp_instance *instance)
1567 {
1568 	int32_t res;
1569 
1570 	if (instance->my_merge_detect_timeout_outstanding == 0) {
1571 		res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1572 			QB_LOOP_MED,
1573 			instance->totem_config->merge_timeout*QB_TIME_NS_IN_MSEC,
1574 			(void *)instance,
1575 			timer_function_merge_detect_timeout,
1576 			&instance->timer_merge_detect_timeout);
1577 		if (res != 0) {
1578 			log_printf(instance->totemsrp_log_level_error, "start_merge_detect_timeout - qb_loop_timer_add error : %d", res);
1579 		}
1580 
1581 		instance->my_merge_detect_timeout_outstanding = 1;
1582 	}
1583 }
1584 
cancel_merge_detect_timeout(struct totemsrp_instance * instance)1585 static void cancel_merge_detect_timeout (struct totemsrp_instance *instance)
1586 {
1587 	qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_merge_detect_timeout);
1588 	instance->my_merge_detect_timeout_outstanding = 0;
1589 }
1590 
1591 /*
1592  * ring_state_* is used to save and restore the sort queue
1593  * state when a recovery operation fails (and enters gather)
1594  */
old_ring_state_save(struct totemsrp_instance * instance)1595 static void old_ring_state_save (struct totemsrp_instance *instance)
1596 {
1597 	if (instance->old_ring_state_saved == 0) {
1598 		instance->old_ring_state_saved = 1;
1599 		memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
1600 			sizeof (struct memb_ring_id));
1601 		instance->old_ring_state_aru = instance->my_aru;
1602 		instance->old_ring_state_high_seq_received = instance->my_high_seq_received;
1603 		log_printf (instance->totemsrp_log_level_debug,
1604 			"Saving state aru %x high seq received %x",
1605 			instance->my_aru, instance->my_high_seq_received);
1606 	}
1607 }
1608 
old_ring_state_restore(struct totemsrp_instance * instance)1609 static void old_ring_state_restore (struct totemsrp_instance *instance)
1610 {
1611 	instance->my_aru = instance->old_ring_state_aru;
1612 	instance->my_high_seq_received = instance->old_ring_state_high_seq_received;
1613 	log_printf (instance->totemsrp_log_level_debug,
1614 		"Restoring instance->my_aru %x my high seq received %x",
1615 		instance->my_aru, instance->my_high_seq_received);
1616 }
1617 
old_ring_state_reset(struct totemsrp_instance * instance)1618 static void old_ring_state_reset (struct totemsrp_instance *instance)
1619 {
1620 	log_printf (instance->totemsrp_log_level_debug,
1621 		"Resetting old ring state");
1622 	instance->old_ring_state_saved = 0;
1623 }
1624 
reset_pause_timeout(struct totemsrp_instance * instance)1625 static void reset_pause_timeout (struct totemsrp_instance *instance)
1626 {
1627 	int32_t res;
1628 
1629 	qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_pause_timeout);
1630 	res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1631 		QB_LOOP_MED,
1632 		instance->totem_config->token_timeout * QB_TIME_NS_IN_MSEC / 5,
1633 		(void *)instance,
1634 		timer_function_pause_timeout,
1635 		&instance->timer_pause_timeout);
1636 	if (res != 0) {
1637 		log_printf(instance->totemsrp_log_level_error, "reset_pause_timeout - qb_loop_timer_add error : %d", res);
1638 	}
1639 }
1640 
reset_token_timeout(struct totemsrp_instance * instance)1641 static void reset_token_timeout (struct totemsrp_instance *instance) {
1642 	int32_t res;
1643 
1644 	qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1645 	res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1646 		QB_LOOP_MED,
1647 		instance->totem_config->token_timeout*QB_TIME_NS_IN_MSEC,
1648 		(void *)instance,
1649 		timer_function_orf_token_timeout,
1650 		&instance->timer_orf_token_timeout);
1651 	if (res != 0) {
1652 		log_printf(instance->totemsrp_log_level_error, "reset_token_timeout - qb_loop_timer_add error : %d", res);
1653 	}
1654 }
1655 
reset_heartbeat_timeout(struct totemsrp_instance * instance)1656 static void reset_heartbeat_timeout (struct totemsrp_instance *instance) {
1657 	int32_t res;
1658 
1659         qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1660         res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1661 		QB_LOOP_MED,
1662                 instance->heartbeat_timeout*QB_TIME_NS_IN_MSEC,
1663                 (void *)instance,
1664                 timer_function_heartbeat_timeout,
1665                 &instance->timer_heartbeat_timeout);
1666 	if (res != 0) {
1667 		log_printf(instance->totemsrp_log_level_error, "reset_heartbeat_timeout - qb_loop_timer_add error : %d", res);
1668 	}
1669 }
1670 
1671 
cancel_token_timeout(struct totemsrp_instance * instance)1672 static void cancel_token_timeout (struct totemsrp_instance *instance) {
1673 	qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1674 }
1675 
cancel_heartbeat_timeout(struct totemsrp_instance * instance)1676 static void cancel_heartbeat_timeout (struct totemsrp_instance *instance) {
1677 	qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1678 }
1679 
cancel_token_retransmit_timeout(struct totemsrp_instance * instance)1680 static void cancel_token_retransmit_timeout (struct totemsrp_instance *instance)
1681 {
1682 	qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_retransmit_timeout);
1683 }
1684 
start_token_hold_retransmit_timeout(struct totemsrp_instance * instance)1685 static void start_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1686 {
1687 	int32_t res;
1688 
1689 	res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1690 		QB_LOOP_MED,
1691 		instance->totem_config->token_hold_timeout*QB_TIME_NS_IN_MSEC,
1692 		(void *)instance,
1693 		timer_function_token_hold_retransmit_timeout,
1694 		&instance->timer_orf_token_hold_retransmit_timeout);
1695 	if (res != 0) {
1696 		log_printf(instance->totemsrp_log_level_error, "start_token_hold_retransmit_timeout - qb_loop_timer_add error : %d", res);
1697 	}
1698 }
1699 
cancel_token_hold_retransmit_timeout(struct totemsrp_instance * instance)1700 static void cancel_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1701 {
1702 	qb_loop_timer_del (instance->totemsrp_poll_handle,
1703 		instance->timer_orf_token_hold_retransmit_timeout);
1704 }
1705 
memb_state_consensus_timeout_expired(struct totemsrp_instance * instance)1706 static void memb_state_consensus_timeout_expired (
1707 		struct totemsrp_instance *instance)
1708 {
1709         struct srp_addr no_consensus_list[PROCESSOR_COUNT_MAX];
1710 	int no_consensus_list_entries;
1711 
1712 	instance->stats.consensus_timeouts++;
1713 	if (memb_consensus_agreed (instance)) {
1714 		memb_consensus_reset (instance);
1715 
1716 		memb_consensus_set (instance, &instance->my_id);
1717 
1718 		reset_token_timeout (instance); // REVIEWED
1719 	} else {
1720 		memb_consensus_notset (
1721 			instance,
1722 			no_consensus_list,
1723 			&no_consensus_list_entries,
1724 			instance->my_proc_list,
1725 			instance->my_proc_list_entries);
1726 
1727 		memb_set_merge (no_consensus_list, no_consensus_list_entries,
1728 			instance->my_failed_list, &instance->my_failed_list_entries);
1729 		memb_state_gather_enter (instance, TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT);
1730 	}
1731 }
1732 
1733 static void memb_join_message_send (struct totemsrp_instance *instance);
1734 
1735 static void memb_merge_detect_transmit (struct totemsrp_instance *instance);
1736 
1737 /*
1738  * Timers used for various states of the membership algorithm
1739  */
timer_function_pause_timeout(void * data)1740 static void timer_function_pause_timeout (void *data)
1741 {
1742 	struct totemsrp_instance *instance = data;
1743 
1744 	instance->pause_timestamp = qb_util_nano_current_get ();
1745 	reset_pause_timeout (instance);
1746 }
1747 
memb_recovery_state_token_loss(struct totemsrp_instance * instance)1748 static void memb_recovery_state_token_loss (struct totemsrp_instance *instance)
1749 {
1750 	old_ring_state_restore (instance);
1751 	memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE);
1752 	instance->stats.recovery_token_lost++;
1753 }
1754 
timer_function_orf_token_timeout(void * data)1755 static void timer_function_orf_token_timeout (void *data)
1756 {
1757 	struct totemsrp_instance *instance = data;
1758 
1759 	switch (instance->memb_state) {
1760 		case MEMB_STATE_OPERATIONAL:
1761 			log_printf (instance->totemsrp_log_level_debug,
1762 				"The token was lost in the OPERATIONAL state.");
1763 			log_printf (instance->totemsrp_log_level_notice,
1764 				"A processor failed, forming new configuration.");
1765 			totemrrp_iface_check (instance->totemrrp_context);
1766 			memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE);
1767 			instance->stats.operational_token_lost++;
1768 			break;
1769 
1770 		case MEMB_STATE_GATHER:
1771 			log_printf (instance->totemsrp_log_level_debug,
1772 				"The consensus timeout expired.");
1773 			memb_state_consensus_timeout_expired (instance);
1774 			memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED);
1775 			instance->stats.gather_token_lost++;
1776 			break;
1777 
1778 		case MEMB_STATE_COMMIT:
1779 			log_printf (instance->totemsrp_log_level_debug,
1780 				"The token was lost in the COMMIT state.");
1781 			memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE);
1782 			instance->stats.commit_token_lost++;
1783 			break;
1784 
1785 		case MEMB_STATE_RECOVERY:
1786 			log_printf (instance->totemsrp_log_level_debug,
1787 				"The token was lost in the RECOVERY state.");
1788 			memb_recovery_state_token_loss (instance);
1789 			instance->orf_token_discard = 1;
1790 			break;
1791 	}
1792 }
1793 
timer_function_heartbeat_timeout(void * data)1794 static void timer_function_heartbeat_timeout (void *data)
1795 {
1796 	struct totemsrp_instance *instance = data;
1797 	log_printf (instance->totemsrp_log_level_debug,
1798 		"HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->memb_state);
1799 	timer_function_orf_token_timeout(data);
1800 }
1801 
memb_timer_function_state_gather(void * data)1802 static void memb_timer_function_state_gather (void *data)
1803 {
1804 	struct totemsrp_instance *instance = data;
1805 	int32_t res;
1806 
1807 	switch (instance->memb_state) {
1808 	case MEMB_STATE_OPERATIONAL:
1809 	case MEMB_STATE_RECOVERY:
1810 		assert (0); /* this should never happen */
1811 		break;
1812 	case MEMB_STATE_GATHER:
1813 	case MEMB_STATE_COMMIT:
1814 		memb_join_message_send (instance);
1815 
1816 		/*
1817 		 * Restart the join timeout
1818 		`*/
1819 		qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
1820 
1821 		res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1822 			QB_LOOP_MED,
1823 			instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
1824 			(void *)instance,
1825 			memb_timer_function_state_gather,
1826 			&instance->memb_timer_state_gather_join_timeout);
1827 
1828 		if (res != 0) {
1829 			log_printf(instance->totemsrp_log_level_error, "memb_timer_function_state_gather - qb_loop_timer_add error : %d", res);
1830 		}
1831 		break;
1832 	}
1833 }
1834 
memb_timer_function_gather_consensus_timeout(void * data)1835 static void memb_timer_function_gather_consensus_timeout (void *data)
1836 {
1837 	struct totemsrp_instance *instance = data;
1838 	memb_state_consensus_timeout_expired (instance);
1839 }
1840 
deliver_messages_from_recovery_to_regular(struct totemsrp_instance * instance)1841 static void deliver_messages_from_recovery_to_regular (struct totemsrp_instance *instance)
1842 {
1843 	unsigned int i;
1844 	struct sort_queue_item *recovery_message_item;
1845 	struct sort_queue_item regular_message_item;
1846 	unsigned int range = 0;
1847 	int res;
1848 	void *ptr;
1849 	struct mcast *mcast;
1850 
1851 	log_printf (instance->totemsrp_log_level_debug,
1852 		"recovery to regular %x-%x", SEQNO_START_MSG + 1, instance->my_aru);
1853 
1854 	range = instance->my_aru - SEQNO_START_MSG;
1855 	/*
1856 	 * Move messages from recovery to regular sort queue
1857 	 */
1858 // todo should i be initialized to 0 or 1 ?
1859 	for (i = 1; i <= range; i++) {
1860 		res = sq_item_get (&instance->recovery_sort_queue,
1861 			i + SEQNO_START_MSG, &ptr);
1862 		if (res != 0) {
1863 			continue;
1864 		}
1865 		recovery_message_item = ptr;
1866 
1867 		/*
1868 		 * Convert recovery message into regular message
1869 		 */
1870 		mcast = recovery_message_item->mcast;
1871 		if (mcast->header.encapsulated == MESSAGE_ENCAPSULATED) {
1872 			/*
1873 			 * Message is a recovery message encapsulated
1874 			 * in a new ring message
1875 			 */
1876 			regular_message_item.mcast =
1877 				(struct mcast *)(((char *)recovery_message_item->mcast) + sizeof (struct mcast));
1878 			regular_message_item.msg_len =
1879 			recovery_message_item->msg_len - sizeof (struct mcast);
1880 			mcast = regular_message_item.mcast;
1881 		} else {
1882 			/*
1883 			 * TODO this case shouldn't happen
1884 			 */
1885 			continue;
1886 		}
1887 
1888 		log_printf (instance->totemsrp_log_level_debug,
1889 			"comparing if ring id is for this processors old ring seqno %d",
1890 			 mcast->seq);
1891 
1892 		/*
1893 		 * Only add this message to the regular sort
1894 		 * queue if it was originated with the same ring
1895 		 * id as the previous ring
1896 		 */
1897 		if (memcmp (&instance->my_old_ring_id, &mcast->ring_id,
1898 			sizeof (struct memb_ring_id)) == 0) {
1899 
1900 			res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq);
1901 			if (res == 0) {
1902 				sq_item_add (&instance->regular_sort_queue,
1903 					&regular_message_item, mcast->seq);
1904 				if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) {
1905 					instance->old_ring_state_high_seq_received = mcast->seq;
1906 				}
1907 			}
1908 		} else {
1909 			log_printf (instance->totemsrp_log_level_debug,
1910 				"-not adding msg with seq no %x", mcast->seq);
1911 		}
1912 	}
1913 }
1914 
1915 /*
1916  * Change states in the state machine of the membership algorithm
1917  */
memb_state_operational_enter(struct totemsrp_instance * instance)1918 static void memb_state_operational_enter (struct totemsrp_instance *instance)
1919 {
1920 	struct srp_addr joined_list[PROCESSOR_COUNT_MAX];
1921 	int joined_list_entries = 0;
1922 	unsigned int aru_save;
1923 	unsigned int joined_list_totemip[PROCESSOR_COUNT_MAX];
1924 	unsigned int trans_memb_list_totemip[PROCESSOR_COUNT_MAX];
1925 	unsigned int new_memb_list_totemip[PROCESSOR_COUNT_MAX];
1926 	unsigned int left_list[PROCESSOR_COUNT_MAX];
1927 	unsigned int i;
1928 	unsigned int res;
1929 	char left_node_msg[1024];
1930 	char joined_node_msg[1024];
1931 	char failed_node_msg[1024];
1932 
1933 	instance->originated_orf_token = 0;
1934 
1935 	memb_consensus_reset (instance);
1936 
1937 	old_ring_state_reset (instance);
1938 
1939 	deliver_messages_from_recovery_to_regular (instance);
1940 
1941 	log_printf (instance->totemsrp_log_level_trace,
1942 		"Delivering to app %x to %x",
1943 		instance->my_high_delivered + 1, instance->old_ring_state_high_seq_received);
1944 
1945 	aru_save = instance->my_aru;
1946 	instance->my_aru = instance->old_ring_state_aru;
1947 
1948 	messages_deliver_to_app (instance, 0, instance->old_ring_state_high_seq_received);
1949 
1950 	/*
1951 	 * Calculate joined and left list
1952 	 */
1953 	memb_set_subtract (instance->my_left_memb_list,
1954 		&instance->my_left_memb_entries,
1955 		instance->my_memb_list, instance->my_memb_entries,
1956 		instance->my_trans_memb_list, instance->my_trans_memb_entries);
1957 
1958 	memb_set_subtract (joined_list, &joined_list_entries,
1959 		instance->my_new_memb_list, instance->my_new_memb_entries,
1960 		instance->my_trans_memb_list, instance->my_trans_memb_entries);
1961 
1962 	/*
1963 	 * Install new membership
1964 	 */
1965 	instance->my_memb_entries = instance->my_new_memb_entries;
1966 	memcpy (&instance->my_memb_list, instance->my_new_memb_list,
1967 		sizeof (struct srp_addr) * instance->my_memb_entries);
1968 	instance->last_released = 0;
1969 	instance->my_set_retrans_flg = 0;
1970 
1971 	/*
1972 	 * Inform RRP about transitional change
1973 	 */
1974 	totemrrp_membership_changed (
1975 		instance->totemrrp_context,
1976 		TOTEM_CONFIGURATION_TRANSITIONAL,
1977 		instance->my_trans_memb_list, instance->my_trans_memb_entries,
1978 		instance->my_left_memb_list, instance->my_left_memb_entries,
1979 		NULL, 0,
1980 		&instance->my_ring_id);
1981 	/*
1982 	 * Deliver transitional configuration to application
1983 	 */
1984 	srp_addr_to_nodeid (left_list, instance->my_left_memb_list,
1985 		instance->my_left_memb_entries);
1986 	srp_addr_to_nodeid (trans_memb_list_totemip,
1987 		instance->my_trans_memb_list, instance->my_trans_memb_entries);
1988 	instance->totemsrp_confchg_fn (TOTEM_CONFIGURATION_TRANSITIONAL,
1989 		trans_memb_list_totemip, instance->my_trans_memb_entries,
1990 		left_list, instance->my_left_memb_entries,
1991 		0, 0, &instance->my_ring_id);
1992 	instance->waiting_trans_ack = 1;
1993 	instance->totemsrp_waiting_trans_ack_cb_fn (1);
1994 
1995 // TODO we need to filter to ensure we only deliver those
1996 // messages which are part of instance->my_deliver_memb
1997 	messages_deliver_to_app (instance, 1, instance->old_ring_state_high_seq_received);
1998 
1999 	instance->my_aru = aru_save;
2000 
2001 	/*
2002 	 * Inform RRP about regular membership change
2003 	 */
2004 	totemrrp_membership_changed (
2005 		instance->totemrrp_context,
2006 		TOTEM_CONFIGURATION_REGULAR,
2007 		instance->my_new_memb_list, instance->my_new_memb_entries,
2008 		NULL, 0,
2009 		joined_list, joined_list_entries,
2010 		&instance->my_ring_id);
2011 	/*
2012 	 * Deliver regular configuration to application
2013 	 */
2014 	srp_addr_to_nodeid (new_memb_list_totemip,
2015 		instance->my_new_memb_list, instance->my_new_memb_entries);
2016 	srp_addr_to_nodeid (joined_list_totemip, joined_list,
2017 		joined_list_entries);
2018 	instance->totemsrp_confchg_fn (TOTEM_CONFIGURATION_REGULAR,
2019 		new_memb_list_totemip, instance->my_new_memb_entries,
2020 		0, 0,
2021 		joined_list_totemip, joined_list_entries, &instance->my_ring_id);
2022 
2023 	/*
2024 	 * The recovery sort queue now becomes the regular
2025 	 * sort queue.  It is necessary to copy the state
2026 	 * into the regular sort queue.
2027 	 */
2028 	sq_copy (&instance->regular_sort_queue, &instance->recovery_sort_queue);
2029 	instance->my_last_aru = SEQNO_START_MSG;
2030 
2031 	/* When making my_proc_list smaller, ensure that the
2032 	 * now non-used entries are zero-ed out. There are some suspect
2033 	 * assert's that assume that there is always 2 entries in the list.
2034 	 * These fail when my_proc_list is reduced to 1 entry (and the
2035 	 * valid [0] entry is the same as the 'unused' [1] entry).
2036 	 */
2037 	memset(instance->my_proc_list, 0,
2038 		   sizeof (struct srp_addr) * instance->my_proc_list_entries);
2039 
2040 	instance->my_proc_list_entries = instance->my_new_memb_entries;
2041 	memcpy (instance->my_proc_list, instance->my_new_memb_list,
2042 		sizeof (struct srp_addr) * instance->my_memb_entries);
2043 
2044 	instance->my_failed_list_entries = 0;
2045 	/*
2046 	 * TODO Not exactly to spec
2047 	 *
2048 	 * At the entry to this function all messages without a gap are
2049 	 * deliered.
2050 	 *
2051 	 * This code throw away messages from the last gap in the sort queue
2052 	 * to my_high_seq_received
2053 	 *
2054 	 * What should really happen is we should deliver all messages up to
2055 	 * a gap, then delier the transitional configuration, then deliver
2056 	 * the messages between the first gap and my_high_seq_received, then
2057 	 * deliver a regular configuration, then deliver the regular
2058 	 * configuration
2059 	 *
2060 	 * Unfortunately totempg doesn't appear to like this operating mode
2061 	 * which needs more inspection
2062 	 */
2063 	i = instance->my_high_seq_received + 1;
2064 	do {
2065 		void *ptr;
2066 
2067 		i -= 1;
2068 		res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2069 		if (i == 0) {
2070 			break;
2071 		}
2072 	} while (res);
2073 
2074 	instance->my_high_delivered = i;
2075 
2076 	for (i = 0; i <= instance->my_high_delivered; i++) {
2077 		void *ptr;
2078 
2079 		res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2080 		if (res == 0) {
2081 			struct sort_queue_item *regular_message;
2082 
2083 			regular_message = ptr;
2084 			free (regular_message->mcast);
2085 		}
2086 	}
2087 	sq_items_release (&instance->regular_sort_queue, instance->my_high_delivered);
2088 	instance->last_released = instance->my_high_delivered;
2089 
2090 	if (joined_list_entries) {
2091 		int sptr = 0;
2092 		sptr += snprintf(joined_node_msg, sizeof(joined_node_msg)-sptr, " joined:");
2093 		for (i=0; i< joined_list_entries; i++) {
2094 			sptr += snprintf(joined_node_msg+sptr, sizeof(joined_node_msg)-sptr, " %u", joined_list_totemip[i]);
2095 		}
2096 	}
2097 	else {
2098 		joined_node_msg[0] = '\0';
2099 	}
2100 
2101 	if (instance->my_left_memb_entries) {
2102 		int sptr = 0;
2103 		int sptr2 = 0;
2104 		sptr += snprintf(left_node_msg, sizeof(left_node_msg)-sptr, " left:");
2105 		for (i=0; i< instance->my_left_memb_entries; i++) {
2106 			sptr += snprintf(left_node_msg+sptr, sizeof(left_node_msg)-sptr, " %u", left_list[i]);
2107 		}
2108 		for (i=0; i< instance->my_left_memb_entries; i++) {
2109 			if (my_leave_memb_match(instance, left_list[i]) == 0) {
2110 				if (sptr2 == 0) {
2111 					sptr2 += snprintf(failed_node_msg, sizeof(failed_node_msg)-sptr2, " failed:");
2112 				}
2113 				sptr2 += snprintf(failed_node_msg+sptr2, sizeof(left_node_msg)-sptr2, " %u", left_list[i]);
2114 			}
2115 		}
2116 		if (sptr2 == 0) {
2117 			failed_node_msg[0] = '\0';
2118 		}
2119 	}
2120 	else {
2121 		left_node_msg[0] = '\0';
2122 		failed_node_msg[0] = '\0';
2123 	}
2124 
2125 	my_leave_memb_clear(instance);
2126 
2127 	log_printf (instance->totemsrp_log_level_debug,
2128 		"entering OPERATIONAL state.");
2129 	log_printf (instance->totemsrp_log_level_notice,
2130 		"A new membership (%s:%lld) was formed. Members%s%s",
2131 		totemip_print (&instance->my_ring_id.rep),
2132 		instance->my_ring_id.seq,
2133 		joined_node_msg,
2134 		left_node_msg);
2135 
2136 	if (strlen(failed_node_msg)) {
2137 		log_printf (instance->totemsrp_log_level_notice,
2138 			"Failed to receive the leave message.%s",
2139 			failed_node_msg);
2140 	}
2141 
2142 	instance->memb_state = MEMB_STATE_OPERATIONAL;
2143 
2144 	instance->stats.operational_entered++;
2145 	instance->stats.continuous_gather = 0;
2146 
2147 	instance->my_received_flg = 1;
2148 
2149 	reset_pause_timeout (instance);
2150 
2151 	/*
2152 	 * Save ring id information from this configuration to determine
2153 	 * which processors are transitioning from old regular configuration
2154 	 * in to new regular configuration on the next configuration change
2155 	 */
2156 	memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
2157 		sizeof (struct memb_ring_id));
2158 
2159 	return;
2160 }
2161 
memb_state_gather_enter(struct totemsrp_instance * instance,enum gather_state_from gather_from)2162 static void memb_state_gather_enter (
2163 	struct totemsrp_instance *instance,
2164 	enum gather_state_from gather_from)
2165 {
2166 	int32_t res;
2167 
2168 	instance->orf_token_discard = 1;
2169 
2170 	instance->originated_orf_token = 0;
2171 
2172 	memb_set_merge (
2173 		&instance->my_id, 1,
2174 		instance->my_proc_list, &instance->my_proc_list_entries);
2175 
2176 	memb_join_message_send (instance);
2177 
2178 	/*
2179 	 * Restart the join timeout
2180 	 */
2181 	qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2182 
2183 	res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2184 		QB_LOOP_MED,
2185 		instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
2186 		(void *)instance,
2187 		memb_timer_function_state_gather,
2188 		&instance->memb_timer_state_gather_join_timeout);
2189 	if (res != 0) {
2190 		log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(1) : %d", res);
2191 	}
2192 
2193 	/*
2194 	 * Restart the consensus timeout
2195 	 */
2196 	qb_loop_timer_del (instance->totemsrp_poll_handle,
2197 		instance->memb_timer_state_gather_consensus_timeout);
2198 
2199 	res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2200 		QB_LOOP_MED,
2201 		instance->totem_config->consensus_timeout*QB_TIME_NS_IN_MSEC,
2202 		(void *)instance,
2203 		memb_timer_function_gather_consensus_timeout,
2204 		&instance->memb_timer_state_gather_consensus_timeout);
2205 	if (res != 0) {
2206 		log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(2) : %d", res);
2207 	}
2208 
2209 	/*
2210 	 * Cancel the token loss and token retransmission timeouts
2211 	 */
2212 	cancel_token_retransmit_timeout (instance); // REVIEWED
2213 	cancel_token_timeout (instance); // REVIEWED
2214 	cancel_merge_detect_timeout (instance);
2215 
2216 	memb_consensus_reset (instance);
2217 
2218 	memb_consensus_set (instance, &instance->my_id);
2219 
2220 	log_printf (instance->totemsrp_log_level_debug,
2221 		    "entering GATHER state from %d(%s).",
2222 		    gather_from, gsfrom_to_msg(gather_from));
2223 
2224 	instance->memb_state = MEMB_STATE_GATHER;
2225 	instance->stats.gather_entered++;
2226 
2227 	if (gather_from == TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED) {
2228 		/*
2229 		 * State 3 means gather, so we are continuously gathering.
2230 		 */
2231 		instance->stats.continuous_gather++;
2232 	}
2233 
2234 	return;
2235 }
2236 
2237 static void timer_function_token_retransmit_timeout (void *data);
2238 
target_set_completed(void * context)2239 static void target_set_completed (
2240 	void *context)
2241 {
2242 	struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
2243 
2244 	memb_state_commit_token_send (instance);
2245 
2246 }
2247 
memb_state_commit_enter(struct totemsrp_instance * instance)2248 static void memb_state_commit_enter (
2249 	struct totemsrp_instance *instance)
2250 {
2251 	old_ring_state_save (instance);
2252 
2253 	memb_state_commit_token_update (instance);
2254 
2255 	memb_state_commit_token_target_set (instance);
2256 
2257 	qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2258 
2259 	instance->memb_timer_state_gather_join_timeout = 0;
2260 
2261 	qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_consensus_timeout);
2262 
2263 	instance->memb_timer_state_gather_consensus_timeout = 0;
2264 
2265 	memb_ring_id_set (instance, &instance->commit_token->ring_id);
2266 	instance->memb_ring_id_store (&instance->my_ring_id, &instance->my_id.addr[0]);
2267 
2268 	instance->token_ring_id_seq = instance->my_ring_id.seq;
2269 
2270 	log_printf (instance->totemsrp_log_level_debug,
2271 		"entering COMMIT state.");
2272 
2273 	instance->memb_state = MEMB_STATE_COMMIT;
2274 	reset_token_retransmit_timeout (instance); // REVIEWED
2275 	reset_token_timeout (instance); // REVIEWED
2276 
2277 	instance->stats.commit_entered++;
2278 	instance->stats.continuous_gather = 0;
2279 
2280 	/*
2281 	 * reset all flow control variables since we are starting a new ring
2282 	 */
2283 	instance->my_trc = 0;
2284 	instance->my_pbl = 0;
2285 	instance->my_cbl = 0;
2286 	/*
2287 	 * commit token sent after callback that token target has been set
2288 	 */
2289 }
2290 
memb_state_recovery_enter(struct totemsrp_instance * instance,struct memb_commit_token * commit_token)2291 static void memb_state_recovery_enter (
2292 	struct totemsrp_instance *instance,
2293 	struct memb_commit_token *commit_token)
2294 {
2295 	int i;
2296 	int local_received_flg = 1;
2297 	unsigned int low_ring_aru;
2298 	unsigned int range = 0;
2299 	unsigned int messages_originated = 0;
2300 	const struct srp_addr *addr;
2301 	struct memb_commit_token_memb_entry *memb_list;
2302 	struct memb_ring_id my_new_memb_ring_id_list[PROCESSOR_COUNT_MAX];
2303 
2304 	addr = (const struct srp_addr *)commit_token->end_of_commit_token;
2305 	memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
2306 
2307 	log_printf (instance->totemsrp_log_level_debug,
2308 		"entering RECOVERY state.");
2309 
2310 	instance->orf_token_discard = 0;
2311 
2312 	instance->my_high_ring_delivered = 0;
2313 
2314 	sq_reinit (&instance->recovery_sort_queue, SEQNO_START_MSG);
2315 	cs_queue_reinit (&instance->retrans_message_queue);
2316 
2317 	low_ring_aru = instance->old_ring_state_high_seq_received;
2318 
2319 	memb_state_commit_token_send_recovery (instance, commit_token);
2320 
2321 	instance->my_token_seq = SEQNO_START_TOKEN - 1;
2322 
2323 	/*
2324 	 * Build regular configuration
2325 	 */
2326 	totemrrp_processor_count_set (
2327 		instance->totemrrp_context,
2328 		commit_token->addr_entries);
2329 
2330 	/*
2331 	 * Build transitional configuration
2332 	 */
2333 	for (i = 0; i < instance->my_new_memb_entries; i++) {
2334 		memcpy (&my_new_memb_ring_id_list[i],
2335 			&memb_list[i].ring_id,
2336 			sizeof (struct memb_ring_id));
2337 	}
2338 	memb_set_and_with_ring_id (
2339 		instance->my_new_memb_list,
2340 		my_new_memb_ring_id_list,
2341 		instance->my_new_memb_entries,
2342 		instance->my_memb_list,
2343 		instance->my_memb_entries,
2344 		&instance->my_old_ring_id,
2345 		instance->my_trans_memb_list,
2346 		&instance->my_trans_memb_entries);
2347 
2348 	for (i = 0; i < instance->my_trans_memb_entries; i++) {
2349 		log_printf (instance->totemsrp_log_level_debug,
2350 			"TRANS [%d] member %s:", i, totemip_print (&instance->my_trans_memb_list[i].addr[0]));
2351 	}
2352 	for (i = 0; i < instance->my_new_memb_entries; i++) {
2353 		log_printf (instance->totemsrp_log_level_debug,
2354 			"position [%d] member %s:", i, totemip_print (&addr[i].addr[0]));
2355 		log_printf (instance->totemsrp_log_level_debug,
2356 			"previous ring seq %llx rep %s",
2357 			memb_list[i].ring_id.seq,
2358 			totemip_print (&memb_list[i].ring_id.rep));
2359 
2360 		log_printf (instance->totemsrp_log_level_debug,
2361 			"aru %x high delivered %x received flag %d",
2362 			memb_list[i].aru,
2363 			memb_list[i].high_delivered,
2364 			memb_list[i].received_flg);
2365 
2366 	//	assert (totemip_print (&memb_list[i].ring_id.rep) != 0);
2367 	}
2368 	/*
2369 	 * Determine if any received flag is false
2370 	 */
2371 	for (i = 0; i < commit_token->addr_entries; i++) {
2372 		if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2373 			instance->my_trans_memb_list, instance->my_trans_memb_entries) &&
2374 
2375 			memb_list[i].received_flg == 0) {
2376 			instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
2377 			memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
2378 				sizeof (struct srp_addr) * instance->my_trans_memb_entries);
2379 			local_received_flg = 0;
2380 			break;
2381 		}
2382 	}
2383 	if (local_received_flg == 1) {
2384 		goto no_originate;
2385 	} /* Else originate messages if we should */
2386 
2387 	/*
2388 	 * Calculate my_low_ring_aru, instance->my_high_ring_delivered for the transitional membership
2389 	 */
2390 	for (i = 0; i < commit_token->addr_entries; i++) {
2391 		if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2392 			instance->my_deliver_memb_list,
2393 			 instance->my_deliver_memb_entries) &&
2394 
2395 		memcmp (&instance->my_old_ring_id,
2396 			&memb_list[i].ring_id,
2397 			sizeof (struct memb_ring_id)) == 0) {
2398 
2399 			if (sq_lt_compare (memb_list[i].aru, low_ring_aru)) {
2400 
2401 				low_ring_aru = memb_list[i].aru;
2402 			}
2403 			if (sq_lt_compare (instance->my_high_ring_delivered, memb_list[i].high_delivered)) {
2404 				instance->my_high_ring_delivered = memb_list[i].high_delivered;
2405 			}
2406 		}
2407 	}
2408 
2409 	/*
2410 	 * Copy all old ring messages to instance->retrans_message_queue
2411 	 */
2412 	range = instance->old_ring_state_high_seq_received - low_ring_aru;
2413 	if (range == 0) {
2414 		/*
2415 		 * No messages to copy
2416 		 */
2417 		goto no_originate;
2418 	}
2419 	assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2420 
2421 	log_printf (instance->totemsrp_log_level_debug,
2422 		"copying all old ring messages from %x-%x.",
2423 		low_ring_aru + 1, instance->old_ring_state_high_seq_received);
2424 
2425 	for (i = 1; i <= range; i++) {
2426 		struct sort_queue_item *sort_queue_item;
2427 		struct message_item message_item;
2428 		void *ptr;
2429 		int res;
2430 
2431 		res = sq_item_get (&instance->regular_sort_queue,
2432 			low_ring_aru + i, &ptr);
2433 		if (res != 0) {
2434 			continue;
2435 		}
2436 		sort_queue_item = ptr;
2437 		messages_originated++;
2438 		memset (&message_item, 0, sizeof (struct message_item));
2439 	// TODO	 LEAK
2440 		message_item.mcast = totemsrp_buffer_alloc (instance);
2441 		assert (message_item.mcast);
2442 		message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
2443 		srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
2444 		message_item.mcast->header.encapsulated = MESSAGE_ENCAPSULATED;
2445 		message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
2446 		assert (message_item.mcast->header.nodeid);
2447 		message_item.mcast->header.endian_detector = ENDIAN_LOCAL;
2448 		memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
2449 			sizeof (struct memb_ring_id));
2450 		message_item.msg_len = sort_queue_item->msg_len + sizeof (struct mcast);
2451 		memcpy (((char *)message_item.mcast) + sizeof (struct mcast),
2452 			sort_queue_item->mcast,
2453 			sort_queue_item->msg_len);
2454 		cs_queue_item_add (&instance->retrans_message_queue, &message_item);
2455 	}
2456 	log_printf (instance->totemsrp_log_level_debug,
2457 		"Originated %d messages in RECOVERY.", messages_originated);
2458 	goto originated;
2459 
2460 no_originate:
2461 	log_printf (instance->totemsrp_log_level_debug,
2462 		"Did not need to originate any messages in recovery.");
2463 
2464 originated:
2465 	instance->my_aru = SEQNO_START_MSG;
2466 	instance->my_aru_count = 0;
2467 	instance->my_seq_unchanged = 0;
2468 	instance->my_high_seq_received = SEQNO_START_MSG;
2469 	instance->my_install_seq = SEQNO_START_MSG;
2470 	instance->last_released = SEQNO_START_MSG;
2471 
2472 	reset_token_timeout (instance); // REVIEWED
2473 	reset_token_retransmit_timeout (instance); // REVIEWED
2474 
2475 	instance->memb_state = MEMB_STATE_RECOVERY;
2476 	instance->stats.recovery_entered++;
2477 	instance->stats.continuous_gather = 0;
2478 
2479 	return;
2480 }
2481 
totemsrp_event_signal(void * srp_context,enum totem_event_type type,int value)2482 void totemsrp_event_signal (void *srp_context, enum totem_event_type type, int value)
2483 {
2484 	struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2485 
2486 	token_hold_cancel_send (instance);
2487 
2488 	return;
2489 }
2490 
totemsrp_mcast(void * srp_context,struct iovec * iovec,unsigned int iov_len,int guarantee)2491 int totemsrp_mcast (
2492 	void *srp_context,
2493 	struct iovec *iovec,
2494 	unsigned int iov_len,
2495 	int guarantee)
2496 {
2497 	struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2498 	int i;
2499 	struct message_item message_item;
2500 	char *addr;
2501 	unsigned int addr_idx;
2502 	struct cs_queue *queue_use;
2503 
2504 	if (instance->waiting_trans_ack) {
2505 		queue_use = &instance->new_message_queue_trans;
2506 	} else {
2507 		queue_use = &instance->new_message_queue;
2508 	}
2509 
2510 	if (cs_queue_is_full (queue_use)) {
2511 		log_printf (instance->totemsrp_log_level_debug, "queue full");
2512 		return (-1);
2513 	}
2514 
2515 	memset (&message_item, 0, sizeof (struct message_item));
2516 
2517 	/*
2518 	 * Allocate pending item
2519 	 */
2520 	message_item.mcast = totemsrp_buffer_alloc (instance);
2521 	if (message_item.mcast == 0) {
2522 		goto error_mcast;
2523 	}
2524 
2525 	/*
2526 	 * Set mcast header
2527 	 */
2528 	memset(message_item.mcast, 0, sizeof (struct mcast));
2529 	message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
2530 	message_item.mcast->header.endian_detector = ENDIAN_LOCAL;
2531 	message_item.mcast->header.encapsulated = MESSAGE_NOT_ENCAPSULATED;
2532 	message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
2533 	assert (message_item.mcast->header.nodeid);
2534 
2535 	message_item.mcast->guarantee = guarantee;
2536 	srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
2537 
2538 	addr = (char *)message_item.mcast;
2539 	addr_idx = sizeof (struct mcast);
2540 	for (i = 0; i < iov_len; i++) {
2541 		memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2542 		addr_idx += iovec[i].iov_len;
2543 	}
2544 
2545 	message_item.msg_len = addr_idx;
2546 
2547 	log_printf (instance->totemsrp_log_level_trace, "mcasted message added to pending queue");
2548 	instance->stats.mcast_tx++;
2549 	cs_queue_item_add (queue_use, &message_item);
2550 
2551 	return (0);
2552 
2553 error_mcast:
2554 	return (-1);
2555 }
2556 
2557 /*
2558  * Determine if there is room to queue a new message
2559  */
totemsrp_avail(void * srp_context)2560 int totemsrp_avail (void *srp_context)
2561 {
2562 	struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2563 	int avail;
2564 	struct cs_queue *queue_use;
2565 
2566 	if (instance->waiting_trans_ack) {
2567 		queue_use = &instance->new_message_queue_trans;
2568 	} else {
2569 		queue_use = &instance->new_message_queue;
2570 	}
2571 	cs_queue_avail (queue_use, &avail);
2572 
2573 	return (avail);
2574 }
2575 
2576 /*
2577  * ORF Token Management
2578  */
2579 /*
2580  * Recast message to mcast group if it is available
2581  */
orf_token_remcast(struct totemsrp_instance * instance,int seq)2582 static int orf_token_remcast (
2583 	struct totemsrp_instance *instance,
2584 	int seq)
2585 {
2586 	struct sort_queue_item *sort_queue_item;
2587 	int res;
2588 	void *ptr;
2589 
2590 	struct sq *sort_queue;
2591 
2592 	if (instance->memb_state == MEMB_STATE_RECOVERY) {
2593 		sort_queue = &instance->recovery_sort_queue;
2594 	} else {
2595 		sort_queue = &instance->regular_sort_queue;
2596 	}
2597 
2598 	res = sq_in_range (sort_queue, seq);
2599 	if (res == 0) {
2600 		log_printf (instance->totemsrp_log_level_debug, "sq not in range");
2601 		return (-1);
2602 	}
2603 
2604 	/*
2605 	 * Get RTR item at seq, if not available, return
2606 	 */
2607 	res = sq_item_get (sort_queue, seq, &ptr);
2608 	if (res != 0) {
2609 		return -1;
2610 	}
2611 
2612 	sort_queue_item = ptr;
2613 
2614 	totemrrp_mcast_noflush_send (
2615 		instance->totemrrp_context,
2616 		sort_queue_item->mcast,
2617 		sort_queue_item->msg_len);
2618 
2619 	return (0);
2620 }
2621 
2622 
2623 /*
2624  * Free all freeable messages from ring
2625  */
messages_free(struct totemsrp_instance * instance,unsigned int token_aru)2626 static void messages_free (
2627 	struct totemsrp_instance *instance,
2628 	unsigned int token_aru)
2629 {
2630 	struct sort_queue_item *regular_message;
2631 	unsigned int i;
2632 	int res;
2633 	int log_release = 0;
2634 	unsigned int release_to;
2635 	unsigned int range = 0;
2636 
2637 	release_to = token_aru;
2638 	if (sq_lt_compare (instance->my_last_aru, release_to)) {
2639 		release_to = instance->my_last_aru;
2640 	}
2641 	if (sq_lt_compare (instance->my_high_delivered, release_to)) {
2642 		release_to = instance->my_high_delivered;
2643 	}
2644 
2645 	/*
2646 	 * Ensure we dont try release before an already released point
2647 	 */
2648 	if (sq_lt_compare (release_to, instance->last_released)) {
2649 		return;
2650 	}
2651 
2652 	range = release_to - instance->last_released;
2653 	assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2654 
2655 	/*
2656 	 * Release retransmit list items if group aru indicates they are transmitted
2657 	 */
2658 	for (i = 1; i <= range; i++) {
2659 		void *ptr;
2660 
2661 		res = sq_item_get (&instance->regular_sort_queue,
2662 			instance->last_released + i, &ptr);
2663 		if (res == 0) {
2664 			regular_message = ptr;
2665 			totemsrp_buffer_release (instance, regular_message->mcast);
2666 		}
2667 		sq_items_release (&instance->regular_sort_queue,
2668 			instance->last_released + i);
2669 
2670 		log_release = 1;
2671 	}
2672 	instance->last_released += range;
2673 
2674  	if (log_release) {
2675 		log_printf (instance->totemsrp_log_level_trace,
2676 			"releasing messages up to and including %x", release_to);
2677 	}
2678 }
2679 
update_aru(struct totemsrp_instance * instance)2680 static void update_aru (
2681 	struct totemsrp_instance *instance)
2682 {
2683 	unsigned int i;
2684 	int res;
2685 	struct sq *sort_queue;
2686 	unsigned int range;
2687 	unsigned int my_aru_saved = 0;
2688 
2689 	if (instance->memb_state == MEMB_STATE_RECOVERY) {
2690 		sort_queue = &instance->recovery_sort_queue;
2691 	} else {
2692 		sort_queue = &instance->regular_sort_queue;
2693 	}
2694 
2695 	range = instance->my_high_seq_received - instance->my_aru;
2696 
2697 	my_aru_saved = instance->my_aru;
2698 	for (i = 1; i <= range; i++) {
2699 
2700 		void *ptr;
2701 
2702 		res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2703 		/*
2704 		 * If hole, stop updating aru
2705 		 */
2706 		if (res != 0) {
2707 			break;
2708 		}
2709 	}
2710 	instance->my_aru += i - 1;
2711 }
2712 
2713 /*
2714  * Multicasts pending messages onto the ring (requires orf_token possession)
2715  */
orf_token_mcast(struct totemsrp_instance * instance,struct orf_token * token,int fcc_mcasts_allowed)2716 static int orf_token_mcast (
2717 	struct totemsrp_instance *instance,
2718 	struct orf_token *token,
2719 	int fcc_mcasts_allowed)
2720 {
2721 	struct message_item *message_item = 0;
2722 	struct cs_queue *mcast_queue;
2723 	struct sq *sort_queue;
2724 	struct sort_queue_item sort_queue_item;
2725 	struct mcast *mcast;
2726 	unsigned int fcc_mcast_current;
2727 
2728 	if (instance->memb_state == MEMB_STATE_RECOVERY) {
2729 		mcast_queue = &instance->retrans_message_queue;
2730 		sort_queue = &instance->recovery_sort_queue;
2731 		reset_token_retransmit_timeout (instance); // REVIEWED
2732 	} else {
2733 		if (instance->waiting_trans_ack) {
2734 			mcast_queue = &instance->new_message_queue_trans;
2735 		} else {
2736 			mcast_queue = &instance->new_message_queue;
2737 		}
2738 
2739 		sort_queue = &instance->regular_sort_queue;
2740 	}
2741 
2742 	for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2743 		if (cs_queue_is_empty (mcast_queue)) {
2744 			break;
2745 		}
2746 		message_item = (struct message_item *)cs_queue_item_get (mcast_queue);
2747 
2748 		message_item->mcast->seq = ++token->seq;
2749 		message_item->mcast->this_seqno = instance->global_seqno++;
2750 
2751 		/*
2752 		 * Build IO vector
2753 		 */
2754 		memset (&sort_queue_item, 0, sizeof (struct sort_queue_item));
2755 		sort_queue_item.mcast = message_item->mcast;
2756 		sort_queue_item.msg_len = message_item->msg_len;
2757 
2758 		mcast = sort_queue_item.mcast;
2759 
2760 		memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
2761 
2762 		/*
2763 		 * Add message to retransmit queue
2764 		 */
2765 		sq_item_add (sort_queue, &sort_queue_item, message_item->mcast->seq);
2766 
2767 		totemrrp_mcast_noflush_send (
2768 			instance->totemrrp_context,
2769 			message_item->mcast,
2770 			message_item->msg_len);
2771 
2772 		/*
2773 		 * Delete item from pending queue
2774 		 */
2775 		cs_queue_item_remove (mcast_queue);
2776 
2777 		/*
2778 		 * If messages mcasted, deliver any new messages to totempg
2779 		 */
2780 		instance->my_high_seq_received = token->seq;
2781 	}
2782 
2783 	update_aru (instance);
2784 
2785 	/*
2786 	 * Return 1 if more messages are available for single node clusters
2787 	 */
2788 	return (fcc_mcast_current);
2789 }
2790 
2791 /*
2792  * Remulticasts messages in orf_token's retransmit list (requires orf_token)
2793  * Modify's orf_token's rtr to include retransmits required by this process
2794  */
orf_token_rtr(struct totemsrp_instance * instance,struct orf_token * orf_token,unsigned int * fcc_allowed)2795 static int orf_token_rtr (
2796 	struct totemsrp_instance *instance,
2797 	struct orf_token *orf_token,
2798 	unsigned int *fcc_allowed)
2799 {
2800 	unsigned int res;
2801 	unsigned int i, j;
2802 	unsigned int found;
2803 	struct sq *sort_queue;
2804 	struct rtr_item *rtr_list;
2805 	unsigned int range = 0;
2806 	char retransmit_msg[1024];
2807 	char value[64];
2808 
2809 	if (instance->memb_state == MEMB_STATE_RECOVERY) {
2810 		sort_queue = &instance->recovery_sort_queue;
2811 	} else {
2812 		sort_queue = &instance->regular_sort_queue;
2813 	}
2814 
2815 	rtr_list = &orf_token->rtr_list[0];
2816 
2817 	strcpy (retransmit_msg, "Retransmit List: ");
2818 	if (orf_token->rtr_list_entries) {
2819 		log_printf (instance->totemsrp_log_level_debug,
2820 			"Retransmit List %d", orf_token->rtr_list_entries);
2821 		for (i = 0; i < orf_token->rtr_list_entries; i++) {
2822 			sprintf (value, "%x ", rtr_list[i].seq);
2823 			strcat (retransmit_msg, value);
2824 		}
2825 		strcat (retransmit_msg, "");
2826 		log_printf (instance->totemsrp_log_level_notice,
2827 			"%s", retransmit_msg);
2828 	}
2829 
2830 	/*
2831 	 * Retransmit messages on orf_token's RTR list from RTR queue
2832 	 */
2833 	for (instance->fcc_remcast_current = 0, i = 0;
2834 		instance->fcc_remcast_current < *fcc_allowed && i < orf_token->rtr_list_entries;) {
2835 
2836 		/*
2837 		 * If this retransmit request isn't from this configuration,
2838 		 * try next rtr entry
2839 		 */
2840  		if (memcmp (&rtr_list[i].ring_id, &instance->my_ring_id,
2841 			sizeof (struct memb_ring_id)) != 0) {
2842 
2843 			i += 1;
2844 			continue;
2845 		}
2846 
2847 		res = orf_token_remcast (instance, rtr_list[i].seq);
2848 		if (res == 0) {
2849 			/*
2850 			 * Multicasted message, so no need to copy to new retransmit list
2851 			 */
2852 			orf_token->rtr_list_entries -= 1;
2853 			assert (orf_token->rtr_list_entries >= 0);
2854 			memmove (&rtr_list[i], &rtr_list[i + 1],
2855 				sizeof (struct rtr_item) * (orf_token->rtr_list_entries - i));
2856 
2857 			instance->stats.mcast_retx++;
2858 			instance->fcc_remcast_current++;
2859 		} else {
2860 			i += 1;
2861 		}
2862 	}
2863 	*fcc_allowed = *fcc_allowed - instance->fcc_remcast_current;
2864 
2865 	/*
2866 	 * Add messages to retransmit to RTR list
2867 	 * but only retry if there is room in the retransmit list
2868 	 */
2869 
2870 	range = orf_token->seq - instance->my_aru;
2871 	assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2872 
2873 	for (i = 1; (orf_token->rtr_list_entries < RETRANSMIT_ENTRIES_MAX) &&
2874 		(i <= range); i++) {
2875 
2876 		/*
2877 		 * Ensure message is within the sort queue range
2878 		 */
2879 		res = sq_in_range (sort_queue, instance->my_aru + i);
2880 		if (res == 0) {
2881 			break;
2882 		}
2883 
2884 		/*
2885 		 * Find if a message is missing from this processor
2886 		 */
2887 		res = sq_item_inuse (sort_queue, instance->my_aru + i);
2888 		if (res == 0) {
2889 			/*
2890 			 * Determine how many times we have missed receiving
2891 			 * this sequence number.  sq_item_miss_count increments
2892 			 * a counter for the sequence number.  The miss count
2893 			 * will be returned and compared.  This allows time for
2894 			 * delayed multicast messages to be received before
2895 			 * declaring the message is missing and requesting a
2896 			 * retransmit.
2897 			 */
2898 			res = sq_item_miss_count (sort_queue, instance->my_aru + i);
2899 			if (res < instance->totem_config->miss_count_const) {
2900 				continue;
2901 			}
2902 
2903 			/*
2904 			 * Determine if missing message is already in retransmit list
2905 			 */
2906 			found = 0;
2907 			for (j = 0; j < orf_token->rtr_list_entries; j++) {
2908 				if (instance->my_aru + i == rtr_list[j].seq) {
2909 					found = 1;
2910 				}
2911 			}
2912 			if (found == 0) {
2913 				/*
2914 				 * Missing message not found in current retransmit list so add it
2915 				 */
2916 				memcpy (&rtr_list[orf_token->rtr_list_entries].ring_id,
2917 					&instance->my_ring_id, sizeof (struct memb_ring_id));
2918 				rtr_list[orf_token->rtr_list_entries].seq = instance->my_aru + i;
2919 				orf_token->rtr_list_entries++;
2920 			}
2921 		}
2922 	}
2923 	return (instance->fcc_remcast_current);
2924 }
2925 
token_retransmit(struct totemsrp_instance * instance)2926 static void token_retransmit (struct totemsrp_instance *instance)
2927 {
2928 	totemrrp_token_send (instance->totemrrp_context,
2929 		instance->orf_token_retransmit,
2930 		instance->orf_token_retransmit_size);
2931 }
2932 
2933 /*
2934  * Retransmit the regular token if no mcast or token has
2935  * been received in retransmit token period retransmit
2936  * the token to the next processor
2937  */
timer_function_token_retransmit_timeout(void * data)2938 static void timer_function_token_retransmit_timeout (void *data)
2939 {
2940 	struct totemsrp_instance *instance = data;
2941 
2942 	switch (instance->memb_state) {
2943 	case MEMB_STATE_GATHER:
2944 		break;
2945 	case MEMB_STATE_COMMIT:
2946 	case MEMB_STATE_OPERATIONAL:
2947 	case MEMB_STATE_RECOVERY:
2948 		token_retransmit (instance);
2949 		reset_token_retransmit_timeout (instance); // REVIEWED
2950 		break;
2951 	}
2952 }
2953 
timer_function_token_hold_retransmit_timeout(void * data)2954 static void timer_function_token_hold_retransmit_timeout (void *data)
2955 {
2956 	struct totemsrp_instance *instance = data;
2957 
2958 	switch (instance->memb_state) {
2959 	case MEMB_STATE_GATHER:
2960 		break;
2961 	case MEMB_STATE_COMMIT:
2962 		break;
2963 	case MEMB_STATE_OPERATIONAL:
2964 	case MEMB_STATE_RECOVERY:
2965 		token_retransmit (instance);
2966 		break;
2967 	}
2968 }
2969 
timer_function_merge_detect_timeout(void * data)2970 static void timer_function_merge_detect_timeout(void *data)
2971 {
2972 	struct totemsrp_instance *instance = data;
2973 
2974 	instance->my_merge_detect_timeout_outstanding = 0;
2975 
2976 	switch (instance->memb_state) {
2977 	case MEMB_STATE_OPERATIONAL:
2978 		if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0])) {
2979 			memb_merge_detect_transmit (instance);
2980 		}
2981 		break;
2982 	case MEMB_STATE_GATHER:
2983 	case MEMB_STATE_COMMIT:
2984 	case MEMB_STATE_RECOVERY:
2985 		break;
2986 	}
2987 }
2988 
2989 /*
2990  * Send orf_token to next member (requires orf_token)
2991  */
token_send(struct totemsrp_instance * instance,struct orf_token * orf_token,int forward_token)2992 static int token_send (
2993 	struct totemsrp_instance *instance,
2994 	struct orf_token *orf_token,
2995 	int forward_token)
2996 {
2997 	int res = 0;
2998 	unsigned int orf_token_size;
2999 
3000 	orf_token_size = sizeof (struct orf_token) +
3001 		(orf_token->rtr_list_entries * sizeof (struct rtr_item));
3002 
3003 	orf_token->header.nodeid = instance->my_id.addr[0].nodeid;
3004 	memcpy (instance->orf_token_retransmit, orf_token, orf_token_size);
3005 	instance->orf_token_retransmit_size = orf_token_size;
3006 	assert (orf_token->header.nodeid);
3007 
3008 	if (forward_token == 0) {
3009 		return (0);
3010 	}
3011 
3012 	totemrrp_token_send (instance->totemrrp_context,
3013 		orf_token,
3014 		orf_token_size);
3015 
3016 	return (res);
3017 }
3018 
token_hold_cancel_send(struct totemsrp_instance * instance)3019 static int token_hold_cancel_send (struct totemsrp_instance *instance)
3020 {
3021 	struct token_hold_cancel token_hold_cancel;
3022 
3023 	/*
3024 	 * Only cancel if the token is currently held
3025 	 */
3026 	if (instance->my_token_held == 0) {
3027 		return (0);
3028 	}
3029 	instance->my_token_held = 0;
3030 
3031 	/*
3032 	 * Build message
3033 	 */
3034 	token_hold_cancel.header.type = MESSAGE_TYPE_TOKEN_HOLD_CANCEL;
3035 	token_hold_cancel.header.endian_detector = ENDIAN_LOCAL;
3036 	token_hold_cancel.header.encapsulated = 0;
3037 	token_hold_cancel.header.nodeid = instance->my_id.addr[0].nodeid;
3038 	memcpy (&token_hold_cancel.ring_id, &instance->my_ring_id,
3039 		sizeof (struct memb_ring_id));
3040 	assert (token_hold_cancel.header.nodeid);
3041 
3042 	instance->stats.token_hold_cancel_tx++;
3043 
3044 	totemrrp_mcast_flush_send (instance->totemrrp_context, &token_hold_cancel,
3045 		sizeof (struct token_hold_cancel));
3046 
3047 	return (0);
3048 }
3049 
orf_token_send_initial(struct totemsrp_instance * instance)3050 static int orf_token_send_initial (struct totemsrp_instance *instance)
3051 {
3052 	struct orf_token orf_token;
3053 	int res;
3054 
3055 	orf_token.header.type = MESSAGE_TYPE_ORF_TOKEN;
3056 	orf_token.header.endian_detector = ENDIAN_LOCAL;
3057 	orf_token.header.encapsulated = 0;
3058 	orf_token.header.nodeid = instance->my_id.addr[0].nodeid;
3059 	assert (orf_token.header.nodeid);
3060 	orf_token.seq = SEQNO_START_MSG;
3061 	orf_token.token_seq = SEQNO_START_TOKEN;
3062 	orf_token.retrans_flg = 1;
3063 	instance->my_set_retrans_flg = 1;
3064 	instance->stats.orf_token_tx++;
3065 
3066 	if (cs_queue_is_empty (&instance->retrans_message_queue) == 1) {
3067 		orf_token.retrans_flg = 0;
3068 		instance->my_set_retrans_flg = 0;
3069 	} else {
3070 		orf_token.retrans_flg = 1;
3071 		instance->my_set_retrans_flg = 1;
3072 	}
3073 
3074 	orf_token.aru = 0;
3075 	orf_token.aru = SEQNO_START_MSG - 1;
3076 	orf_token.aru_addr = instance->my_id.addr[0].nodeid;
3077 
3078 	memcpy (&orf_token.ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
3079 	orf_token.fcc = 0;
3080 	orf_token.backlog = 0;
3081 
3082 	orf_token.rtr_list_entries = 0;
3083 
3084 	res = token_send (instance, &orf_token, 1);
3085 
3086 	return (res);
3087 }
3088 
memb_state_commit_token_update(struct totemsrp_instance * instance)3089 static void memb_state_commit_token_update (
3090 	struct totemsrp_instance *instance)
3091 {
3092 	struct srp_addr *addr;
3093 	struct memb_commit_token_memb_entry *memb_list;
3094 	unsigned int high_aru;
3095 	unsigned int i;
3096 
3097 	addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3098 	memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3099 
3100 	memcpy (instance->my_new_memb_list, addr,
3101 		sizeof (struct srp_addr) * instance->commit_token->addr_entries);
3102 
3103 	instance->my_new_memb_entries = instance->commit_token->addr_entries;
3104 
3105 	memcpy (&memb_list[instance->commit_token->memb_index].ring_id,
3106 		&instance->my_old_ring_id, sizeof (struct memb_ring_id));
3107 
3108 	memb_list[instance->commit_token->memb_index].aru = instance->old_ring_state_aru;
3109 	/*
3110 	 *  TODO high delivered is really instance->my_aru, but with safe this
3111 	 * could change?
3112 	 */
3113 	instance->my_received_flg =
3114 		(instance->my_aru == instance->my_high_seq_received);
3115 
3116 	memb_list[instance->commit_token->memb_index].received_flg = instance->my_received_flg;
3117 
3118 	memb_list[instance->commit_token->memb_index].high_delivered = instance->my_high_delivered;
3119 	/*
3120 	 * find high aru up to current memb_index for all matching ring ids
3121 	 * if any ring id matching memb_index has aru less then high aru set
3122 	 * received flag for that entry to false
3123 	 */
3124 	high_aru = memb_list[instance->commit_token->memb_index].aru;
3125 	for (i = 0; i <= instance->commit_token->memb_index; i++) {
3126 		if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3127 			&memb_list[i].ring_id,
3128 			sizeof (struct memb_ring_id)) == 0) {
3129 
3130 			if (sq_lt_compare (high_aru, memb_list[i].aru)) {
3131 				high_aru = memb_list[i].aru;
3132 			}
3133 		}
3134 	}
3135 
3136 	for (i = 0; i <= instance->commit_token->memb_index; i++) {
3137 		if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3138 			&memb_list[i].ring_id,
3139 			sizeof (struct memb_ring_id)) == 0) {
3140 
3141 			if (sq_lt_compare (memb_list[i].aru, high_aru)) {
3142 				memb_list[i].received_flg = 0;
3143 				if (i == instance->commit_token->memb_index) {
3144 					instance->my_received_flg = 0;
3145 				}
3146 			}
3147 		}
3148 	}
3149 
3150 	instance->commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
3151 	instance->commit_token->memb_index += 1;
3152 	assert (instance->commit_token->memb_index <= instance->commit_token->addr_entries);
3153 	assert (instance->commit_token->header.nodeid);
3154 }
3155 
memb_state_commit_token_target_set(struct totemsrp_instance * instance)3156 static void memb_state_commit_token_target_set (
3157 	struct totemsrp_instance *instance)
3158 {
3159 	struct srp_addr *addr;
3160 	unsigned int i;
3161 
3162 	addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3163 
3164 	for (i = 0; i < instance->totem_config->interface_count; i++) {
3165 		totemrrp_token_target_set (
3166 			instance->totemrrp_context,
3167 			&addr[instance->commit_token->memb_index %
3168 				instance->commit_token->addr_entries].addr[i],
3169 			i);
3170 	}
3171 }
3172 
memb_state_commit_token_send_recovery(struct totemsrp_instance * instance,struct memb_commit_token * commit_token)3173 static int memb_state_commit_token_send_recovery (
3174 	struct totemsrp_instance *instance,
3175 	struct memb_commit_token *commit_token)
3176 {
3177 	unsigned int commit_token_size;
3178 
3179 	commit_token->token_seq++;
3180 	commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
3181 	commit_token_size = sizeof (struct memb_commit_token) +
3182 		((sizeof (struct srp_addr) +
3183 			sizeof (struct memb_commit_token_memb_entry)) * commit_token->addr_entries);
3184 	/*
3185 	 * Make a copy for retransmission if necessary
3186 	 */
3187 	memcpy (instance->orf_token_retransmit, commit_token, commit_token_size);
3188 	instance->orf_token_retransmit_size = commit_token_size;
3189 
3190 	instance->stats.memb_commit_token_tx++;
3191 
3192 	totemrrp_token_send (instance->totemrrp_context,
3193 		commit_token,
3194 		commit_token_size);
3195 
3196 	/*
3197 	 * Request retransmission of the commit token in case it is lost
3198 	 */
3199 	reset_token_retransmit_timeout (instance);
3200 	return (0);
3201 }
3202 
memb_state_commit_token_send(struct totemsrp_instance * instance)3203 static int memb_state_commit_token_send (
3204 	struct totemsrp_instance *instance)
3205 {
3206 	unsigned int commit_token_size;
3207 
3208 	instance->commit_token->token_seq++;
3209 	instance->commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
3210 	commit_token_size = sizeof (struct memb_commit_token) +
3211 		((sizeof (struct srp_addr) +
3212 			sizeof (struct memb_commit_token_memb_entry)) * instance->commit_token->addr_entries);
3213 	/*
3214 	 * Make a copy for retransmission if necessary
3215 	 */
3216 	memcpy (instance->orf_token_retransmit, instance->commit_token, commit_token_size);
3217 	instance->orf_token_retransmit_size = commit_token_size;
3218 
3219 	instance->stats.memb_commit_token_tx++;
3220 
3221 	totemrrp_token_send (instance->totemrrp_context,
3222 		instance->commit_token,
3223 		commit_token_size);
3224 
3225 	/*
3226 	 * Request retransmission of the commit token in case it is lost
3227 	 */
3228 	reset_token_retransmit_timeout (instance);
3229 	return (0);
3230 }
3231 
3232 
memb_lowest_in_config(struct totemsrp_instance * instance)3233 static int memb_lowest_in_config (struct totemsrp_instance *instance)
3234 {
3235 	struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3236 	int token_memb_entries = 0;
3237 	int i;
3238 	struct totem_ip_address *lowest_addr;
3239 
3240 	memb_set_subtract (token_memb, &token_memb_entries,
3241 		instance->my_proc_list, instance->my_proc_list_entries,
3242 		instance->my_failed_list, instance->my_failed_list_entries);
3243 
3244 	/*
3245 	 * find representative by searching for smallest identifier
3246 	 */
3247 	assert(token_memb_entries > 0);
3248 
3249 	lowest_addr = &token_memb[0].addr[0];
3250 	for (i = 1; i < token_memb_entries; i++) {
3251 		if (totemip_compare(lowest_addr, &token_memb[i].addr[0]) > 0) {
3252 			totemip_copy (lowest_addr, &token_memb[i].addr[0]);
3253 		}
3254 	}
3255 	return (totemip_compare (lowest_addr, &instance->my_id.addr[0]) == 0);
3256 }
3257 
srp_addr_compare(const void * a,const void * b)3258 static int srp_addr_compare (const void *a, const void *b)
3259 {
3260 	const struct srp_addr *srp_a = (const struct srp_addr *)a;
3261 	const struct srp_addr *srp_b = (const struct srp_addr *)b;
3262 
3263 	return (totemip_compare (&srp_a->addr[0], &srp_b->addr[0]));
3264 }
3265 
memb_state_commit_token_create(struct totemsrp_instance * instance)3266 static void memb_state_commit_token_create (
3267 	struct totemsrp_instance *instance)
3268 {
3269 	struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3270 	struct srp_addr *addr;
3271 	struct memb_commit_token_memb_entry *memb_list;
3272 	int token_memb_entries = 0;
3273 
3274 	log_printf (instance->totemsrp_log_level_debug,
3275 		"Creating commit token because I am the rep.");
3276 
3277 	memb_set_subtract (token_memb, &token_memb_entries,
3278 		instance->my_proc_list, instance->my_proc_list_entries,
3279 		instance->my_failed_list, instance->my_failed_list_entries);
3280 
3281 	memset (instance->commit_token, 0, sizeof (struct memb_commit_token));
3282 	instance->commit_token->header.type = MESSAGE_TYPE_MEMB_COMMIT_TOKEN;
3283 	instance->commit_token->header.endian_detector = ENDIAN_LOCAL;
3284 	instance->commit_token->header.encapsulated = 0;
3285 	instance->commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
3286 	assert (instance->commit_token->header.nodeid);
3287 
3288 	totemip_copy(&instance->commit_token->ring_id.rep, &instance->my_id.addr[0]);
3289 
3290 	instance->commit_token->ring_id.seq = instance->token_ring_id_seq + 4;
3291 
3292 	/*
3293 	 * This qsort is necessary to ensure the commit token traverses
3294 	 * the ring in the proper order
3295 	 */
3296 	qsort (token_memb, token_memb_entries, sizeof (struct srp_addr),
3297 		srp_addr_compare);
3298 
3299 	instance->commit_token->memb_index = 0;
3300 	instance->commit_token->addr_entries = token_memb_entries;
3301 
3302 	addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3303 	memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3304 
3305 	memcpy (addr, token_memb,
3306 		token_memb_entries * sizeof (struct srp_addr));
3307 	memset (memb_list, 0,
3308 		sizeof (struct memb_commit_token_memb_entry) * token_memb_entries);
3309 }
3310 
memb_join_message_send(struct totemsrp_instance * instance)3311 static void memb_join_message_send (struct totemsrp_instance *instance)
3312 {
3313 	char memb_join_data[40000];
3314 	struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3315 	char *addr;
3316 	unsigned int addr_idx;
3317 	size_t msg_len;
3318 
3319 	memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
3320 	memb_join->header.endian_detector = ENDIAN_LOCAL;
3321 	memb_join->header.encapsulated = 0;
3322 	memb_join->header.nodeid = instance->my_id.addr[0].nodeid;
3323 	assert (memb_join->header.nodeid);
3324 
3325 	msg_len = sizeof(struct memb_join) +
3326 	    ((instance->my_proc_list_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3327 
3328 	if (msg_len > sizeof(memb_join_data)) {
3329 		log_printf (instance->totemsrp_log_level_error,
3330 			"memb_join_message too long. Ignoring message.");
3331 
3332 		return ;
3333 	}
3334 
3335 	memb_join->ring_seq = instance->my_ring_id.seq;
3336 	memb_join->proc_list_entries = instance->my_proc_list_entries;
3337 	memb_join->failed_list_entries = instance->my_failed_list_entries;
3338 	srp_addr_copy (&memb_join->system_from, &instance->my_id);
3339 
3340 	/*
3341 	 * This mess adds the joined and failed processor lists into the join
3342 	 * message
3343 	 */
3344 	addr = (char *)memb_join;
3345 	addr_idx = sizeof (struct memb_join);
3346 	memcpy (&addr[addr_idx],
3347 		instance->my_proc_list,
3348 		instance->my_proc_list_entries *
3349 			sizeof (struct srp_addr));
3350 	addr_idx +=
3351 		instance->my_proc_list_entries *
3352 		sizeof (struct srp_addr);
3353 	memcpy (&addr[addr_idx],
3354 		instance->my_failed_list,
3355 		instance->my_failed_list_entries *
3356 		sizeof (struct srp_addr));
3357 	addr_idx +=
3358 		instance->my_failed_list_entries *
3359 		sizeof (struct srp_addr);
3360 
3361 	if (instance->totem_config->send_join_timeout) {
3362 		usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3363 	}
3364 
3365 	instance->stats.memb_join_tx++;
3366 
3367 	totemrrp_mcast_flush_send (
3368 		instance->totemrrp_context,
3369 		memb_join,
3370 		addr_idx);
3371 }
3372 
memb_leave_message_send(struct totemsrp_instance * instance)3373 static void memb_leave_message_send (struct totemsrp_instance *instance)
3374 {
3375 	char memb_join_data[40000];
3376 	struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3377 	char *addr;
3378 	unsigned int addr_idx;
3379 	int active_memb_entries;
3380 	struct srp_addr active_memb[PROCESSOR_COUNT_MAX];
3381 	size_t msg_len;
3382 
3383 	log_printf (instance->totemsrp_log_level_debug,
3384 		"sending join/leave message");
3385 
3386 	/*
3387 	 * add us to the failed list, and remove us from
3388 	 * the members list
3389 	 */
3390 	memb_set_merge(
3391 		       &instance->my_id, 1,
3392 		       instance->my_failed_list, &instance->my_failed_list_entries);
3393 
3394 	memb_set_subtract (active_memb, &active_memb_entries,
3395 			   instance->my_proc_list, instance->my_proc_list_entries,
3396 			   &instance->my_id, 1);
3397 
3398 	msg_len = sizeof(struct memb_join) +
3399 	    ((active_memb_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3400 
3401 	if (msg_len > sizeof(memb_join_data)) {
3402 		log_printf (instance->totemsrp_log_level_error,
3403 			"memb_leave message too long. Ignoring message.");
3404 
3405 		return ;
3406 	}
3407 
3408 	memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
3409 	memb_join->header.endian_detector = ENDIAN_LOCAL;
3410 	memb_join->header.encapsulated = 0;
3411 	memb_join->header.nodeid = LEAVE_DUMMY_NODEID;
3412 
3413 	memb_join->ring_seq = instance->my_ring_id.seq;
3414 	memb_join->proc_list_entries = active_memb_entries;
3415 	memb_join->failed_list_entries = instance->my_failed_list_entries;
3416 	srp_addr_copy (&memb_join->system_from, &instance->my_id);
3417 	memb_join->system_from.addr[0].nodeid = LEAVE_DUMMY_NODEID;
3418 
3419 	// TODO: CC Maybe use the actual join send routine.
3420 	/*
3421 	 * This mess adds the joined and failed processor lists into the join
3422 	 * message
3423 	 */
3424 	addr = (char *)memb_join;
3425 	addr_idx = sizeof (struct memb_join);
3426 	memcpy (&addr[addr_idx],
3427 		active_memb,
3428 		active_memb_entries *
3429 			sizeof (struct srp_addr));
3430 	addr_idx +=
3431 		active_memb_entries *
3432 		sizeof (struct srp_addr);
3433 	memcpy (&addr[addr_idx],
3434 		instance->my_failed_list,
3435 		instance->my_failed_list_entries *
3436 		sizeof (struct srp_addr));
3437 	addr_idx +=
3438 		instance->my_failed_list_entries *
3439 		sizeof (struct srp_addr);
3440 
3441 
3442 	if (instance->totem_config->send_join_timeout) {
3443 		usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3444 	}
3445 	instance->stats.memb_join_tx++;
3446 
3447 	totemrrp_mcast_flush_send (
3448 		instance->totemrrp_context,
3449 		memb_join,
3450 		addr_idx);
3451 }
3452 
memb_merge_detect_transmit(struct totemsrp_instance * instance)3453 static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
3454 {
3455 	struct memb_merge_detect memb_merge_detect;
3456 
3457 	memb_merge_detect.header.type = MESSAGE_TYPE_MEMB_MERGE_DETECT;
3458 	memb_merge_detect.header.endian_detector = ENDIAN_LOCAL;
3459 	memb_merge_detect.header.encapsulated = 0;
3460 	memb_merge_detect.header.nodeid = instance->my_id.addr[0].nodeid;
3461 	srp_addr_copy (&memb_merge_detect.system_from, &instance->my_id);
3462 	memcpy (&memb_merge_detect.ring_id, &instance->my_ring_id,
3463 		sizeof (struct memb_ring_id));
3464 	assert (memb_merge_detect.header.nodeid);
3465 
3466 	instance->stats.memb_merge_detect_tx++;
3467 	totemrrp_mcast_flush_send (instance->totemrrp_context,
3468 		&memb_merge_detect,
3469 		sizeof (struct memb_merge_detect));
3470 }
3471 
memb_ring_id_set(struct totemsrp_instance * instance,const struct memb_ring_id * ring_id)3472 static void memb_ring_id_set (
3473 	struct totemsrp_instance *instance,
3474 	const struct memb_ring_id *ring_id)
3475 {
3476 
3477 	memcpy (&instance->my_ring_id, ring_id, sizeof (struct memb_ring_id));
3478 }
3479 
totemsrp_callback_token_create(void * srp_context,void ** handle_out,enum totem_callback_token_type type,int delete,int (* callback_fn)(enum totem_callback_token_type type,const void *),const void * data)3480 int totemsrp_callback_token_create (
3481 	void *srp_context,
3482 	void **handle_out,
3483 	enum totem_callback_token_type type,
3484 	int delete,
3485 	int (*callback_fn) (enum totem_callback_token_type type, const void *),
3486 	const void *data)
3487 {
3488 	struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
3489 	struct token_callback_instance *callback_handle;
3490 
3491 	token_hold_cancel_send (instance);
3492 
3493 	callback_handle = malloc (sizeof (struct token_callback_instance));
3494 	if (callback_handle == 0) {
3495 		return (-1);
3496 	}
3497 	*handle_out = (void *)callback_handle;
3498 	list_init (&callback_handle->list);
3499 	callback_handle->callback_fn = callback_fn;
3500 	callback_handle->data = (void *) data;
3501 	callback_handle->callback_type = type;
3502 	callback_handle->delete = delete;
3503 	switch (type) {
3504 	case TOTEM_CALLBACK_TOKEN_RECEIVED:
3505 		list_add (&callback_handle->list, &instance->token_callback_received_listhead);
3506 		break;
3507 	case TOTEM_CALLBACK_TOKEN_SENT:
3508 		list_add (&callback_handle->list, &instance->token_callback_sent_listhead);
3509 		break;
3510 	}
3511 
3512 	return (0);
3513 }
3514 
totemsrp_callback_token_destroy(void * srp_context,void ** handle_out)3515 void totemsrp_callback_token_destroy (void *srp_context, void **handle_out)
3516 {
3517 	struct token_callback_instance *h;
3518 
3519 	if (*handle_out) {
3520  		h = (struct token_callback_instance *)*handle_out;
3521 		list_del (&h->list);
3522 		free (h);
3523 		h = NULL;
3524 		*handle_out = 0;
3525 	}
3526 }
3527 
token_callbacks_execute(struct totemsrp_instance * instance,enum totem_callback_token_type type)3528 static void token_callbacks_execute (
3529 	struct totemsrp_instance *instance,
3530 	enum totem_callback_token_type type)
3531 {
3532 	struct list_head *list;
3533 	struct list_head *list_next;
3534 	struct list_head *callback_listhead = 0;
3535 	struct token_callback_instance *token_callback_instance;
3536 	int res;
3537 	int del;
3538 
3539 	switch (type) {
3540 	case TOTEM_CALLBACK_TOKEN_RECEIVED:
3541 		callback_listhead = &instance->token_callback_received_listhead;
3542 		break;
3543 	case TOTEM_CALLBACK_TOKEN_SENT:
3544 		callback_listhead = &instance->token_callback_sent_listhead;
3545 		break;
3546 	default:
3547 		assert (0);
3548 	}
3549 
3550 	for (list = callback_listhead->next; list != callback_listhead;
3551 		list = list_next) {
3552 
3553 		token_callback_instance = list_entry (list, struct token_callback_instance, list);
3554 
3555 		list_next = list->next;
3556 		del = token_callback_instance->delete;
3557 		if (del == 1) {
3558 			list_del (list);
3559 		}
3560 
3561 		res = token_callback_instance->callback_fn (
3562 			token_callback_instance->callback_type,
3563 			token_callback_instance->data);
3564 		/*
3565 		 * This callback failed to execute, try it again on the next token
3566 		 */
3567 		if (res == -1 && del == 1) {
3568 			list_add (list, callback_listhead);
3569 		} else	if (del) {
3570 			free (token_callback_instance);
3571 		}
3572 	}
3573 }
3574 
3575 /*
3576  * Flow control functions
3577  */
backlog_get(struct totemsrp_instance * instance)3578 static unsigned int backlog_get (struct totemsrp_instance *instance)
3579 {
3580 	unsigned int backlog = 0;
3581 	struct cs_queue *queue_use = NULL;
3582 
3583 	if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
3584 		if (instance->waiting_trans_ack) {
3585 			queue_use = &instance->new_message_queue_trans;
3586 		} else {
3587 			queue_use = &instance->new_message_queue;
3588 		}
3589 	} else
3590 	if (instance->memb_state == MEMB_STATE_RECOVERY) {
3591 		queue_use = &instance->retrans_message_queue;
3592 	}
3593 
3594 	if (queue_use != NULL) {
3595 		backlog = cs_queue_used (queue_use);
3596 	}
3597 
3598 	instance->stats.token[instance->stats.latest_token].backlog_calc = backlog;
3599 	return (backlog);
3600 }
3601 
fcc_calculate(struct totemsrp_instance * instance,struct orf_token * token)3602 static int fcc_calculate (
3603 	struct totemsrp_instance *instance,
3604 	struct orf_token *token)
3605 {
3606 	unsigned int transmits_allowed;
3607 	unsigned int backlog_calc;
3608 
3609 	transmits_allowed = instance->totem_config->max_messages;
3610 
3611 	if (transmits_allowed > instance->totem_config->window_size - token->fcc) {
3612 		transmits_allowed = instance->totem_config->window_size - token->fcc;
3613 	}
3614 
3615 	instance->my_cbl = backlog_get (instance);
3616 
3617 	/*
3618 	 * Only do backlog calculation if there is a backlog otherwise
3619 	 * we would result in div by zero
3620 	 */
3621 	if (token->backlog + instance->my_cbl - instance->my_pbl) {
3622 		backlog_calc = (instance->totem_config->window_size * instance->my_pbl) /
3623 			(token->backlog + instance->my_cbl - instance->my_pbl);
3624 		if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3625 			transmits_allowed = backlog_calc;
3626 		}
3627 	}
3628 
3629 	return (transmits_allowed);
3630 }
3631 
3632 /*
3633  * don't overflow the RTR sort queue
3634  */
fcc_rtr_limit(struct totemsrp_instance * instance,struct orf_token * token,unsigned int * transmits_allowed)3635 static void fcc_rtr_limit (
3636 	struct totemsrp_instance *instance,
3637 	struct orf_token *token,
3638 	unsigned int *transmits_allowed)
3639 {
3640 	int check = QUEUE_RTR_ITEMS_SIZE_MAX;
3641 	check -= (*transmits_allowed + instance->totem_config->window_size);
3642 	assert (check >= 0);
3643 	if (sq_lt_compare (instance->last_released +
3644 		QUEUE_RTR_ITEMS_SIZE_MAX - *transmits_allowed -
3645 		instance->totem_config->window_size,
3646 
3647 			token->seq)) {
3648 
3649 			*transmits_allowed = 0;
3650 	}
3651 }
3652 
fcc_token_update(struct totemsrp_instance * instance,struct orf_token * token,unsigned int msgs_transmitted)3653 static void fcc_token_update (
3654 	struct totemsrp_instance *instance,
3655 	struct orf_token *token,
3656 	unsigned int msgs_transmitted)
3657 {
3658 	token->fcc += msgs_transmitted - instance->my_trc;
3659 	token->backlog += instance->my_cbl - instance->my_pbl;
3660 	instance->my_trc = msgs_transmitted;
3661 	instance->my_pbl = instance->my_cbl;
3662 }
3663 
3664 /*
3665  * Sanity checkers
3666  */
check_totemip_sanity(const struct totemsrp_instance * instance,const struct totem_ip_address * addr,int endian_conversion_needed)3667 static int check_totemip_sanity(
3668 	const struct totemsrp_instance *instance,
3669 	const struct totem_ip_address *addr,
3670 	int endian_conversion_needed)
3671 {
3672 	unsigned short family;
3673 
3674 	family = addr->family;
3675 	if (endian_conversion_needed) {
3676 		family = swab16(family);
3677 	}
3678 
3679 	if (family != AF_INET && family != AF_INET6) {
3680 		log_printf (instance->totemsrp_log_level_security,
3681 		    "Received message corrupted...  ignoring.");
3682 
3683 		return (-1);
3684 	}
3685 
3686 	return (0);
3687 }
3688 
check_srpaddr_sanity(const struct totemsrp_instance * instance,const struct srp_addr * addr,int endian_conversion_needed)3689 static int check_srpaddr_sanity(
3690 	const struct totemsrp_instance *instance,
3691 	const struct srp_addr *addr,
3692 	int endian_conversion_needed)
3693 {
3694 	int i;
3695 
3696 	if (addr->no_addrs < 1 || addr->no_addrs > INTERFACE_MAX) {
3697 		return (-1);
3698 	}
3699 
3700 	for (i = 0; i < addr->no_addrs; i++) {
3701 		if (i == 0 || addr->addr[i].family != 0) {
3702 			if (check_totemip_sanity(instance, &addr->addr[i], endian_conversion_needed) == -1) {
3703 				return (-1);
3704 			}
3705 		}
3706 	}
3707 
3708 	return (0);
3709 }
3710 
check_orf_token_sanity(const struct totemsrp_instance * instance,const void * msg,size_t msg_len,int endian_conversion_needed)3711 static int check_orf_token_sanity(
3712 	const struct totemsrp_instance *instance,
3713 	const void *msg,
3714 	size_t msg_len,
3715 	int endian_conversion_needed)
3716 {
3717 	int rtr_entries;
3718 	const struct orf_token *token = (const struct orf_token *)msg;
3719 	size_t required_len;
3720 	int i;
3721 
3722 	if (msg_len < sizeof(struct orf_token)) {
3723 		log_printf (instance->totemsrp_log_level_security,
3724 		    "Received orf_token message is too short...  ignoring.");
3725 
3726 		return (-1);
3727 	}
3728 
3729 	if (check_totemip_sanity(instance, &token->ring_id.rep, endian_conversion_needed) == -1) {
3730 		return (-1);
3731 	}
3732 
3733 	if (endian_conversion_needed) {
3734 		rtr_entries = swab32(token->rtr_list_entries);
3735 	} else {
3736 		rtr_entries = token->rtr_list_entries;
3737 	}
3738 
3739 	required_len = sizeof(struct orf_token) + rtr_entries * sizeof(struct rtr_item);
3740 	if (msg_len < required_len) {
3741 		log_printf (instance->totemsrp_log_level_security,
3742 		    "Received orf_token message is too short...  ignoring.");
3743 
3744 		return (-1);
3745 	}
3746 
3747 	for (i = 0; i < rtr_entries; i++) {
3748 		if (check_totemip_sanity(instance, &token->rtr_list[i].ring_id.rep,
3749 		    endian_conversion_needed) == -1) {
3750 			return (-1);
3751 		}
3752 	}
3753 
3754 	return (0);
3755 }
3756 
check_mcast_sanity(struct totemsrp_instance * instance,const void * msg,size_t msg_len,int endian_conversion_needed)3757 static int check_mcast_sanity(
3758 	struct totemsrp_instance *instance,
3759 	const void *msg,
3760 	size_t msg_len,
3761 	int endian_conversion_needed)
3762 {
3763 	const struct mcast *mcast_msg = (const struct mcast *)msg;
3764 
3765 	if (msg_len < sizeof(struct mcast)) {
3766 		log_printf (instance->totemsrp_log_level_security,
3767 		    "Received mcast message is too short...  ignoring.");
3768 
3769 		return (-1);
3770 	}
3771 
3772 	if ((check_totemip_sanity(instance, &mcast_msg->ring_id.rep, endian_conversion_needed) == -1) ||
3773 	    (check_srpaddr_sanity(instance, &mcast_msg->system_from, endian_conversion_needed) == -1)) {
3774 		return (-1);
3775 	}
3776 
3777 	return (0);
3778 }
3779 
check_memb_merge_detect_sanity(struct totemsrp_instance * instance,const void * msg,size_t msg_len,int endian_conversion_needed)3780 static int check_memb_merge_detect_sanity(
3781 	struct totemsrp_instance *instance,
3782 	const void *msg,
3783 	size_t msg_len,
3784 	int endian_conversion_needed)
3785 {
3786 	const struct memb_merge_detect *mmd_msg = (const struct memb_merge_detect *)msg;
3787 
3788 	if (msg_len < sizeof(struct memb_merge_detect)) {
3789 		log_printf (instance->totemsrp_log_level_security,
3790 		    "Received memb_merge_detect message is too short...  ignoring.");
3791 
3792 		return (-1);
3793 	}
3794 
3795 	if ((check_totemip_sanity(instance, &mmd_msg->ring_id.rep, endian_conversion_needed) == -1) ||
3796 	    (check_srpaddr_sanity(instance, &mmd_msg->system_from, endian_conversion_needed) == -1)) {
3797 		return (-1);
3798 	}
3799 
3800 	return (0);
3801 }
3802 
check_memb_join_sanity(struct totemsrp_instance * instance,const void * msg,size_t msg_len,int endian_conversion_needed)3803 static int check_memb_join_sanity(
3804 	struct totemsrp_instance *instance,
3805 	const void *msg,
3806 	size_t msg_len,
3807 	int endian_conversion_needed)
3808 {
3809 	const struct memb_join *mj_msg = (const struct memb_join *)msg;
3810 	unsigned int proc_list_entries;
3811 	unsigned int failed_list_entries;
3812 	size_t required_len;
3813 	const struct srp_addr *proc_list;
3814 	const struct srp_addr *failed_list;
3815 	int i;
3816 
3817 	if (msg_len < sizeof(struct memb_join)) {
3818 		log_printf (instance->totemsrp_log_level_security,
3819 		    "Received memb_join message is too short...  ignoring.");
3820 
3821 		return (-1);
3822 	}
3823 
3824 	if (check_srpaddr_sanity(instance, &mj_msg->system_from, endian_conversion_needed) == -1) {
3825 		return (-1);
3826 	}
3827 
3828 	proc_list_entries = mj_msg->proc_list_entries;
3829 	failed_list_entries = mj_msg->failed_list_entries;
3830 
3831 	if (endian_conversion_needed) {
3832 		proc_list_entries = swab32(proc_list_entries);
3833 		failed_list_entries = swab32(failed_list_entries);
3834 	}
3835 
3836 	required_len = sizeof(struct memb_join) + ((proc_list_entries + failed_list_entries) * sizeof(struct srp_addr));
3837 	if (msg_len < required_len) {
3838 		log_printf (instance->totemsrp_log_level_security,
3839 		    "Received memb_join message is too short...  ignoring.");
3840 
3841 		return (-1);
3842 	}
3843 
3844 	proc_list = (struct srp_addr *)mj_msg->end_of_memb_join;
3845 	failed_list = proc_list + proc_list_entries;
3846 
3847 	for (i = 0; i < proc_list_entries; i++) {
3848 		if (check_srpaddr_sanity(instance, &proc_list[i], endian_conversion_needed) == -1) {
3849 			return (-1);
3850 		}
3851 	}
3852 
3853 	for (i = 0; i < failed_list_entries; i++) {
3854 		if (check_srpaddr_sanity(instance, &failed_list[i], endian_conversion_needed) == -1) {
3855 			return (-1);
3856 		}
3857 	}
3858 
3859 	return (0);
3860 }
3861 
check_memb_commit_token_sanity(struct totemsrp_instance * instance,const void * msg,size_t msg_len,int endian_conversion_needed)3862 static int check_memb_commit_token_sanity(
3863 	struct totemsrp_instance *instance,
3864 	const void *msg,
3865 	size_t msg_len,
3866 	int endian_conversion_needed)
3867 {
3868 	const struct memb_commit_token *mct_msg = (const struct memb_commit_token *)msg;
3869 	unsigned int addr_entries;
3870 	const struct srp_addr *addr;
3871 	const struct memb_commit_token_memb_entry *memb_list;
3872 	size_t required_len;
3873 	int i;
3874 
3875 	if (msg_len < sizeof(struct memb_commit_token)) {
3876 		log_printf (instance->totemsrp_log_level_security,
3877 		    "Received memb_commit_token message is too short...  ignoring.");
3878 
3879 		return (0);
3880 	}
3881 
3882 	if (check_totemip_sanity(instance, &mct_msg->ring_id.rep, endian_conversion_needed) == -1) {
3883 		return (-1);
3884 	}
3885 
3886 	addr_entries= mct_msg->addr_entries;
3887 	if (endian_conversion_needed) {
3888 		addr_entries = swab32(addr_entries);
3889 	}
3890 
3891 	required_len = sizeof(struct memb_commit_token) +
3892 	    (addr_entries * (sizeof(struct srp_addr) + sizeof(struct memb_commit_token_memb_entry)));
3893 	if (msg_len < required_len) {
3894 		log_printf (instance->totemsrp_log_level_security,
3895 		    "Received memb_commit_token message is too short...  ignoring.");
3896 
3897 		return (-1);
3898 	}
3899 
3900 	addr = (const struct srp_addr *)mct_msg->end_of_commit_token;
3901 	memb_list = (const struct memb_commit_token_memb_entry *)(addr + addr_entries);
3902 
3903 	for (i = 0; i < addr_entries; i++) {
3904 		if (check_srpaddr_sanity(instance, &addr[i], endian_conversion_needed) == -1) {
3905 			return (-1);
3906 		}
3907 
3908 		if (memb_list[i].ring_id.rep.family != 0) {
3909 			if (check_totemip_sanity(instance, &memb_list[i].ring_id.rep,
3910 			    endian_conversion_needed) == -1) {
3911 				return (-1);
3912 			}
3913 		}
3914 	}
3915 
3916 	return (0);
3917 }
3918 
check_token_hold_cancel_sanity(struct totemsrp_instance * instance,const void * msg,size_t msg_len,int endian_conversion_needed)3919 static int check_token_hold_cancel_sanity(
3920 	struct totemsrp_instance *instance,
3921 	const void *msg,
3922 	size_t msg_len,
3923 	int endian_conversion_needed)
3924 {
3925 	const struct token_hold_cancel *thc_msg = (const struct token_hold_cancel *)msg;
3926 
3927 	if (msg_len < sizeof(struct token_hold_cancel)) {
3928 		log_printf (instance->totemsrp_log_level_security,
3929 		    "Received token_hold_cancel message is too short...  ignoring.");
3930 
3931 		return (-1);
3932 	}
3933 
3934 	if (check_totemip_sanity(instance, &thc_msg->ring_id.rep, endian_conversion_needed) == -1) {
3935 		return (-1);
3936 	}
3937 
3938 	return (0);
3939 }
3940 
3941 /*
3942  * Message Handlers
3943  */
3944 
3945 unsigned long long int tv_old;
3946 /*
3947  * message handler called when TOKEN message type received
3948  */
message_handler_orf_token(struct totemsrp_instance * instance,const void * msg,size_t msg_len,int endian_conversion_needed)3949 static int message_handler_orf_token (
3950 	struct totemsrp_instance *instance,
3951 	const void *msg,
3952 	size_t msg_len,
3953 	int endian_conversion_needed)
3954 {
3955 	char token_storage[1500];
3956 	char token_convert[1500];
3957 	struct orf_token *token = NULL;
3958 	int forward_token;
3959 	unsigned int transmits_allowed;
3960 	unsigned int mcasted_retransmit;
3961 	unsigned int mcasted_regular;
3962 	unsigned int last_aru;
3963 
3964 #ifdef GIVEINFO
3965 	unsigned long long tv_current;
3966 	unsigned long long tv_diff;
3967 
3968 	tv_current = qb_util_nano_current_get ();
3969 	tv_diff = tv_current - tv_old;
3970 	tv_old = tv_current;
3971 
3972 	log_printf (instance->totemsrp_log_level_debug,
3973 	"Time since last token %0.4f ms", ((float)tv_diff) / 1000000.0);
3974 #endif
3975 
3976 	if (check_orf_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
3977 		return (0);
3978 	}
3979 
3980 	if (instance->orf_token_discard) {
3981 		return (0);
3982 	}
3983 #ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
3984 	if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
3985 		return (0);
3986 	}
3987 #endif
3988 
3989 	if (endian_conversion_needed) {
3990 		orf_token_endian_convert ((struct orf_token *)msg,
3991 			(struct orf_token *)token_convert);
3992 		msg = (struct orf_token *)token_convert;
3993 	}
3994 
3995 	/*
3996 	 * Make copy of token and retransmit list in case we have
3997 	 * to flush incoming messages from the kernel queue
3998 	 */
3999 	token = (struct orf_token *)token_storage;
4000 	memcpy (token, msg, sizeof (struct orf_token));
4001 	memcpy (&token->rtr_list[0], (char *)msg + sizeof (struct orf_token),
4002 		sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX);
4003 
4004 
4005 	/*
4006 	 * Handle merge detection timeout
4007 	 */
4008 	if (token->seq == instance->my_last_seq) {
4009 		start_merge_detect_timeout (instance);
4010 		instance->my_seq_unchanged += 1;
4011 	} else {
4012 		cancel_merge_detect_timeout (instance);
4013 		cancel_token_hold_retransmit_timeout (instance);
4014 		instance->my_seq_unchanged = 0;
4015 	}
4016 
4017 	instance->my_last_seq = token->seq;
4018 
4019 #ifdef TEST_RECOVERY_MSG_COUNT
4020 	if (instance->memb_state == MEMB_STATE_OPERATIONAL && token->seq > TEST_RECOVERY_MSG_COUNT) {
4021 		return (0);
4022 	}
4023 #endif
4024 	instance->flushing = 1;
4025 	totemrrp_recv_flush (instance->totemrrp_context);
4026 	instance->flushing = 0;
4027 
4028 	/*
4029 	 * Determine if we should hold (in reality drop) the token
4030 	 */
4031 	instance->my_token_held = 0;
4032 	if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0]) &&
4033 		instance->my_seq_unchanged > instance->totem_config->seqno_unchanged_const) {
4034 		instance->my_token_held = 1;
4035 	} else
4036 		if (!totemip_equal(&instance->my_ring_id.rep,  &instance->my_id.addr[0]) &&
4037 		instance->my_seq_unchanged >= instance->totem_config->seqno_unchanged_const) {
4038 		instance->my_token_held = 1;
4039 	}
4040 
4041 	/*
4042 	 * Hold onto token when there is no activity on ring and
4043 	 * this processor is the ring rep
4044 	 */
4045 	forward_token = 1;
4046 	if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0])) {
4047 		if (instance->my_token_held) {
4048 			forward_token = 0;
4049 		}
4050 	}
4051 
4052 	token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_RECEIVED);
4053 
4054 	switch (instance->memb_state) {
4055 	case MEMB_STATE_COMMIT:
4056 		 /* Discard token */
4057 		break;
4058 
4059 	case MEMB_STATE_OPERATIONAL:
4060 		messages_free (instance, token->aru);
4061 		/*
4062 		 * Do NOT add break, this case should also execute code in gather case.
4063 		 */
4064 
4065 	case MEMB_STATE_GATHER:
4066 		/*
4067 		 * DO NOT add break, we use different free mechanism in recovery state
4068 		 */
4069 
4070 	case MEMB_STATE_RECOVERY:
4071 		/*
4072 		 * Discard tokens from another configuration
4073 		 */
4074 		if (memcmp (&token->ring_id, &instance->my_ring_id,
4075 			sizeof (struct memb_ring_id)) != 0) {
4076 
4077 			if ((forward_token)
4078 				&& instance->use_heartbeat) {
4079 				reset_heartbeat_timeout(instance);
4080 			}
4081 			else {
4082 				cancel_heartbeat_timeout(instance);
4083 			}
4084 
4085 			return (0); /* discard token */
4086 		}
4087 
4088 		/*
4089 		 * Discard retransmitted tokens
4090 		 */
4091 		if (sq_lte_compare (token->token_seq, instance->my_token_seq)) {
4092 			return (0); /* discard token */
4093 		}
4094 		last_aru = instance->my_last_aru;
4095 		instance->my_last_aru = token->aru;
4096 
4097 		transmits_allowed = fcc_calculate (instance, token);
4098 		mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
4099 
4100 		if (instance->my_token_held == 1 &&
4101 			(token->rtr_list_entries > 0 || mcasted_retransmit > 0)) {
4102 			instance->my_token_held = 0;
4103 			forward_token = 1;
4104 		}
4105 
4106 		fcc_rtr_limit (instance, token, &transmits_allowed);
4107 		mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
4108 /*
4109 if (mcasted_regular) {
4110 printf ("mcasted regular %d\n", mcasted_regular);
4111 printf ("token seq %d\n", token->seq);
4112 }
4113 */
4114 		fcc_token_update (instance, token, mcasted_retransmit +
4115 			mcasted_regular);
4116 
4117 		if (sq_lt_compare (instance->my_aru, token->aru) ||
4118 			instance->my_id.addr[0].nodeid ==  token->aru_addr ||
4119 			token->aru_addr == 0) {
4120 
4121 			token->aru = instance->my_aru;
4122 			if (token->aru == token->seq) {
4123 				token->aru_addr = 0;
4124 			} else {
4125 				token->aru_addr = instance->my_id.addr[0].nodeid;
4126 			}
4127 		}
4128 		if (token->aru == last_aru && token->aru_addr != 0) {
4129 			instance->my_aru_count += 1;
4130 		} else {
4131 			instance->my_aru_count = 0;
4132 		}
4133 
4134 		/*
4135 		 * We really don't follow specification there. In specification, OTHER nodes
4136 		 * detect failure of one node (based on aru_count) and my_id IS NEVER added
4137 		 * to failed list (so node never mark itself as failed)
4138 		 */
4139 		if (instance->my_aru_count > instance->totem_config->fail_to_recv_const &&
4140 			token->aru_addr == instance->my_id.addr[0].nodeid) {
4141 
4142 			log_printf (instance->totemsrp_log_level_error,
4143 				"FAILED TO RECEIVE");
4144 
4145 			instance->failed_to_recv = 1;
4146 
4147 			memb_set_merge (&instance->my_id, 1,
4148 				instance->my_failed_list,
4149 				&instance->my_failed_list_entries);
4150 
4151 			memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FAILED_TO_RECEIVE);
4152 		} else {
4153 			instance->my_token_seq = token->token_seq;
4154 			token->token_seq += 1;
4155 
4156 			if (instance->memb_state == MEMB_STATE_RECOVERY) {
4157 				/*
4158 				 * instance->my_aru == instance->my_high_seq_received means this processor
4159 				 * has recovered all messages it can recover
4160 				 * (ie: its retrans queue is empty)
4161 				 */
4162 				if (cs_queue_is_empty (&instance->retrans_message_queue) == 0) {
4163 
4164 					if (token->retrans_flg == 0) {
4165 						token->retrans_flg = 1;
4166 						instance->my_set_retrans_flg = 1;
4167 					}
4168 				} else
4169 				if (token->retrans_flg == 1 && instance->my_set_retrans_flg) {
4170 					token->retrans_flg = 0;
4171 					instance->my_set_retrans_flg = 0;
4172 				}
4173 				log_printf (instance->totemsrp_log_level_debug,
4174 					"token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
4175 					token->retrans_flg, instance->my_set_retrans_flg,
4176 					cs_queue_is_empty (&instance->retrans_message_queue),
4177 					instance->my_retrans_flg_count, token->aru);
4178 				if (token->retrans_flg == 0) {
4179 					instance->my_retrans_flg_count += 1;
4180 				} else {
4181 					instance->my_retrans_flg_count = 0;
4182 				}
4183 				if (instance->my_retrans_flg_count == 2) {
4184 					instance->my_install_seq = token->seq;
4185 				}
4186 				log_printf (instance->totemsrp_log_level_debug,
4187 					"install seq %x aru %x high seq received %x",
4188 					instance->my_install_seq, instance->my_aru, instance->my_high_seq_received);
4189 				if (instance->my_retrans_flg_count >= 2 &&
4190 					instance->my_received_flg == 0 &&
4191 					sq_lte_compare (instance->my_install_seq, instance->my_aru)) {
4192 					instance->my_received_flg = 1;
4193 					instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
4194 					memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
4195 						sizeof (struct totem_ip_address) * instance->my_trans_memb_entries);
4196 				}
4197 				if (instance->my_retrans_flg_count >= 3 &&
4198 					sq_lte_compare (instance->my_install_seq, token->aru)) {
4199 					instance->my_rotation_counter += 1;
4200 				} else {
4201 					instance->my_rotation_counter = 0;
4202 				}
4203 				if (instance->my_rotation_counter == 2) {
4204 				log_printf (instance->totemsrp_log_level_debug,
4205 					"retrans flag count %x token aru %x install seq %x aru %x %x",
4206 					instance->my_retrans_flg_count, token->aru, instance->my_install_seq,
4207 					instance->my_aru, token->seq);
4208 
4209 					memb_state_operational_enter (instance);
4210 					instance->my_rotation_counter = 0;
4211 					instance->my_retrans_flg_count = 0;
4212 				}
4213 			}
4214 
4215 			totemrrp_send_flush (instance->totemrrp_context);
4216 			token_send (instance, token, forward_token);
4217 
4218 #ifdef GIVEINFO
4219 			tv_current = qb_util_nano_current_get ();
4220 			tv_diff = tv_current - tv_old;
4221 			tv_old = tv_current;
4222 			log_printf (instance->totemsrp_log_level_debug,
4223 				"I held %0.4f ms",
4224 				((float)tv_diff) / 1000000.0);
4225 #endif
4226 			if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4227 				messages_deliver_to_app (instance, 0,
4228 					instance->my_high_seq_received);
4229 			}
4230 
4231 			/*
4232 			 * Deliver messages after token has been transmitted
4233 			 * to improve performance
4234 			 */
4235 			reset_token_timeout (instance); // REVIEWED
4236 			reset_token_retransmit_timeout (instance); // REVIEWED
4237 			if (totemip_equal(&instance->my_id.addr[0], &instance->my_ring_id.rep) &&
4238 				instance->my_token_held == 1) {
4239 
4240 				start_token_hold_retransmit_timeout (instance);
4241 			}
4242 
4243 			token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT);
4244 		}
4245 		break;
4246 	}
4247 
4248 	if ((forward_token)
4249 		&& instance->use_heartbeat) {
4250 		reset_heartbeat_timeout(instance);
4251 	}
4252 	else {
4253 		cancel_heartbeat_timeout(instance);
4254 	}
4255 
4256 	return (0);
4257 }
4258 
messages_deliver_to_app(struct totemsrp_instance * instance,int skip,unsigned int end_point)4259 static void messages_deliver_to_app (
4260 	struct totemsrp_instance *instance,
4261 	int skip,
4262 	unsigned int end_point)
4263 {
4264 	struct sort_queue_item *sort_queue_item_p;
4265 	unsigned int i;
4266 	int res;
4267 	struct mcast *mcast_in;
4268 	struct mcast mcast_header;
4269 	unsigned int range = 0;
4270 	int endian_conversion_required;
4271 	unsigned int my_high_delivered_stored = 0;
4272 
4273 
4274 	range = end_point - instance->my_high_delivered;
4275 
4276 	if (range) {
4277 		log_printf (instance->totemsrp_log_level_trace,
4278 			"Delivering %x to %x", instance->my_high_delivered,
4279 			end_point);
4280 	}
4281 	assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
4282 	my_high_delivered_stored = instance->my_high_delivered;
4283 
4284 	/*
4285 	 * Deliver messages in order from rtr queue to pending delivery queue
4286 	 */
4287 	for (i = 1; i <= range; i++) {
4288 
4289 		void *ptr = 0;
4290 
4291 		/*
4292 		 * If out of range of sort queue, stop assembly
4293 		 */
4294 		res = sq_in_range (&instance->regular_sort_queue,
4295 			my_high_delivered_stored + i);
4296 		if (res == 0) {
4297 			break;
4298 		}
4299 
4300 		res = sq_item_get (&instance->regular_sort_queue,
4301 			my_high_delivered_stored + i, &ptr);
4302 		/*
4303 		 * If hole, stop assembly
4304 		 */
4305 		if (res != 0 && skip == 0) {
4306 			break;
4307 		}
4308 
4309 		instance->my_high_delivered = my_high_delivered_stored + i;
4310 
4311 		if (res != 0) {
4312 			continue;
4313 
4314 		}
4315 
4316 		sort_queue_item_p = ptr;
4317 
4318 		mcast_in = sort_queue_item_p->mcast;
4319 		assert (mcast_in != (struct mcast *)0xdeadbeef);
4320 
4321 		endian_conversion_required = 0;
4322 		if (mcast_in->header.endian_detector != ENDIAN_LOCAL) {
4323 			endian_conversion_required = 1;
4324 			mcast_endian_convert (mcast_in, &mcast_header);
4325 		} else {
4326 			memcpy (&mcast_header, mcast_in, sizeof (struct mcast));
4327 		}
4328 
4329 		/*
4330 		 * Skip messages not originated in instance->my_deliver_memb
4331 		 */
4332 		if (skip &&
4333 			memb_set_subset (&mcast_header.system_from,
4334 				1,
4335 				instance->my_deliver_memb_list,
4336 				instance->my_deliver_memb_entries) == 0) {
4337 
4338 			instance->my_high_delivered = my_high_delivered_stored + i;
4339 
4340 			continue;
4341 		}
4342 
4343 		/*
4344 		 * Message found
4345 		 */
4346 		log_printf (instance->totemsrp_log_level_trace,
4347 			"Delivering MCAST message with seq %x to pending delivery queue",
4348 			mcast_header.seq);
4349 
4350 		/*
4351 		 * Message is locally originated multicast
4352 		 */
4353 		instance->totemsrp_deliver_fn (
4354 			mcast_header.header.nodeid,
4355 			((char *)sort_queue_item_p->mcast) + sizeof (struct mcast),
4356 			sort_queue_item_p->msg_len - sizeof (struct mcast),
4357 			endian_conversion_required);
4358 	}
4359 }
4360 
4361 /*
4362  * recv message handler called when MCAST message type received
4363  */
message_handler_mcast(struct totemsrp_instance * instance,const void * msg,size_t msg_len,int endian_conversion_needed)4364 static int message_handler_mcast (
4365 	struct totemsrp_instance *instance,
4366 	const void *msg,
4367 	size_t msg_len,
4368 	int endian_conversion_needed)
4369 {
4370 	struct sort_queue_item sort_queue_item;
4371 	struct sq *sort_queue;
4372 	struct mcast mcast_header;
4373 
4374 	if (check_mcast_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4375 		return (0);
4376 	}
4377 
4378 	if (endian_conversion_needed) {
4379 		mcast_endian_convert (msg, &mcast_header);
4380 	} else {
4381 		memcpy (&mcast_header, msg, sizeof (struct mcast));
4382 	}
4383 
4384 	if (mcast_header.header.encapsulated == MESSAGE_ENCAPSULATED) {
4385 		sort_queue = &instance->recovery_sort_queue;
4386 	} else {
4387 		sort_queue = &instance->regular_sort_queue;
4388 	}
4389 
4390 	assert (msg_len <= FRAME_SIZE_MAX);
4391 
4392 #ifdef TEST_DROP_MCAST_PERCENTAGE
4393 	if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
4394 		return (0);
4395 	}
4396 #endif
4397 
4398 	/*
4399 	 * If the message is foreign execute the switch below
4400 	 */
4401 	if (memcmp (&instance->my_ring_id, &mcast_header.ring_id,
4402 		sizeof (struct memb_ring_id)) != 0) {
4403 
4404 		switch (instance->memb_state) {
4405 		case MEMB_STATE_OPERATIONAL:
4406 			memb_set_merge (
4407 				&mcast_header.system_from, 1,
4408 				instance->my_proc_list, &instance->my_proc_list_entries);
4409 			memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE);
4410 			break;
4411 
4412 		case MEMB_STATE_GATHER:
4413 			if (!memb_set_subset (
4414 				&mcast_header.system_from,
4415 				1,
4416 				instance->my_proc_list,
4417 				instance->my_proc_list_entries)) {
4418 
4419 				memb_set_merge (&mcast_header.system_from, 1,
4420 					instance->my_proc_list, &instance->my_proc_list_entries);
4421 				memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE);
4422 				return (0);
4423 			}
4424 			break;
4425 
4426 		case MEMB_STATE_COMMIT:
4427 			/* discard message */
4428 			instance->stats.rx_msg_dropped++;
4429 			break;
4430 
4431 		case MEMB_STATE_RECOVERY:
4432 			/* discard message */
4433 			instance->stats.rx_msg_dropped++;
4434 			break;
4435 		}
4436 		return (0);
4437 	}
4438 
4439 	log_printf (instance->totemsrp_log_level_trace,
4440 		"Received ringid(%s:%lld) seq %x",
4441 		totemip_print (&mcast_header.ring_id.rep),
4442 		mcast_header.ring_id.seq,
4443 		mcast_header.seq);
4444 
4445 	/*
4446 	 * Add mcast message to rtr queue if not already in rtr queue
4447 	 * otherwise free io vectors
4448 	 */
4449 	if (msg_len > 0 && msg_len <= FRAME_SIZE_MAX &&
4450 		sq_in_range (sort_queue, mcast_header.seq) &&
4451 		sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
4452 
4453 		/*
4454 		 * Allocate new multicast memory block
4455 		 */
4456 // TODO LEAK
4457 		sort_queue_item.mcast = totemsrp_buffer_alloc (instance);
4458 		if (sort_queue_item.mcast == NULL) {
4459 			return (-1); /* error here is corrected by the algorithm */
4460 		}
4461 		memcpy (sort_queue_item.mcast, msg, msg_len);
4462 		sort_queue_item.msg_len = msg_len;
4463 
4464 		if (sq_lt_compare (instance->my_high_seq_received,
4465 			mcast_header.seq)) {
4466 			instance->my_high_seq_received = mcast_header.seq;
4467 		}
4468 
4469 		sq_item_add (sort_queue, &sort_queue_item, mcast_header.seq);
4470 	}
4471 
4472 	update_aru (instance);
4473 	if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4474 		messages_deliver_to_app (instance, 0, instance->my_high_seq_received);
4475 	}
4476 
4477 /* TODO remove from retrans message queue for old ring in recovery state */
4478 	return (0);
4479 }
4480 
message_handler_memb_merge_detect(struct totemsrp_instance * instance,const void * msg,size_t msg_len,int endian_conversion_needed)4481 static int message_handler_memb_merge_detect (
4482 	struct totemsrp_instance *instance,
4483 	const void *msg,
4484 	size_t msg_len,
4485 	int endian_conversion_needed)
4486 {
4487 	struct memb_merge_detect memb_merge_detect;
4488 
4489 	if (check_memb_merge_detect_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4490 		return (0);
4491 	}
4492 
4493 	if (endian_conversion_needed) {
4494 		memb_merge_detect_endian_convert (msg, &memb_merge_detect);
4495 	} else {
4496 		memcpy (&memb_merge_detect, msg,
4497 			sizeof (struct memb_merge_detect));
4498 	}
4499 
4500 	/*
4501 	 * do nothing if this is a merge detect from this configuration
4502 	 */
4503 	if (memcmp (&instance->my_ring_id, &memb_merge_detect.ring_id,
4504 		sizeof (struct memb_ring_id)) == 0) {
4505 
4506 		return (0);
4507 	}
4508 
4509 	/*
4510 	 * Execute merge operation
4511 	 */
4512 	switch (instance->memb_state) {
4513 	case MEMB_STATE_OPERATIONAL:
4514 		memb_set_merge (&memb_merge_detect.system_from, 1,
4515 			instance->my_proc_list, &instance->my_proc_list_entries);
4516 		memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE);
4517 		break;
4518 
4519 	case MEMB_STATE_GATHER:
4520 		if (!memb_set_subset (
4521 			&memb_merge_detect.system_from,
4522 			1,
4523 			instance->my_proc_list,
4524 			instance->my_proc_list_entries)) {
4525 
4526 			memb_set_merge (&memb_merge_detect.system_from, 1,
4527 				instance->my_proc_list, &instance->my_proc_list_entries);
4528 			memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE);
4529 			return (0);
4530 		}
4531 		break;
4532 
4533 	case MEMB_STATE_COMMIT:
4534 		/* do nothing in commit */
4535 		break;
4536 
4537 	case MEMB_STATE_RECOVERY:
4538 		/* do nothing in recovery */
4539 		break;
4540 	}
4541 	return (0);
4542 }
4543 
memb_join_process(struct totemsrp_instance * instance,const struct memb_join * memb_join)4544 static void memb_join_process (
4545 	struct totemsrp_instance *instance,
4546 	const struct memb_join *memb_join)
4547 {
4548 	struct srp_addr *proc_list;
4549 	struct srp_addr *failed_list;
4550 	int gather_entered = 0;
4551 	int fail_minus_memb_entries = 0;
4552 	struct srp_addr fail_minus_memb[PROCESSOR_COUNT_MAX];
4553 
4554 	proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4555 	failed_list = proc_list + memb_join->proc_list_entries;
4556 
4557 /*
4558 	memb_set_print ("proclist", proc_list, memb_join->proc_list_entries);
4559 	memb_set_print ("faillist", failed_list, memb_join->failed_list_entries);
4560 	memb_set_print ("my_proclist", instance->my_proc_list, instance->my_proc_list_entries);
4561 	memb_set_print ("my_faillist", instance->my_failed_list, instance->my_failed_list_entries);
4562 -*/
4563 
4564 	if (memb_join->header.type == MESSAGE_TYPE_MEMB_JOIN) {
4565 		if (instance->flushing) {
4566 			if (memb_join->header.nodeid == LEAVE_DUMMY_NODEID) {
4567 				log_printf (instance->totemsrp_log_level_warning,
4568 			    		"Discarding LEAVE message during flush, nodeid=%u",
4569 						memb_join->failed_list_entries > 0 ? failed_list[memb_join->failed_list_entries - 1 ].addr[0].nodeid : LEAVE_DUMMY_NODEID);
4570 				if (memb_join->failed_list_entries > 0) {
4571 					my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].addr[0].nodeid);
4572 				}
4573 			} else {
4574 				log_printf (instance->totemsrp_log_level_warning,
4575 			    		"Discarding JOIN message during flush, nodeid=%d", memb_join->header.nodeid);
4576 			}
4577 			return;
4578 		} else {
4579 			if (memb_join->header.nodeid == LEAVE_DUMMY_NODEID) {
4580 				log_printf (instance->totemsrp_log_level_debug,
4581 				    "Received LEAVE message from %u", memb_join->failed_list_entries > 0 ? failed_list[memb_join->failed_list_entries - 1 ].addr[0].nodeid : LEAVE_DUMMY_NODEID);
4582 				if (memb_join->failed_list_entries > 0) {
4583 					my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].addr[0].nodeid);
4584 				}
4585 			}
4586 		}
4587 
4588 	}
4589 
4590 	if (memb_set_equal (proc_list,
4591 		memb_join->proc_list_entries,
4592 		instance->my_proc_list,
4593 		instance->my_proc_list_entries) &&
4594 
4595 	memb_set_equal (failed_list,
4596 		memb_join->failed_list_entries,
4597 		instance->my_failed_list,
4598 		instance->my_failed_list_entries)) {
4599 
4600 		memb_consensus_set (instance, &memb_join->system_from);
4601 
4602 		if (memb_consensus_agreed (instance) && instance->failed_to_recv == 1) {
4603 				instance->failed_to_recv = 0;
4604 				srp_addr_copy (&instance->my_proc_list[0],
4605 					&instance->my_id);
4606 				instance->my_proc_list_entries = 1;
4607 				instance->my_failed_list_entries = 0;
4608 
4609 				memb_state_commit_token_create (instance);
4610 
4611 				memb_state_commit_enter (instance);
4612 				return;
4613 		}
4614 		if (memb_consensus_agreed (instance) &&
4615 			memb_lowest_in_config (instance)) {
4616 
4617 			memb_state_commit_token_create (instance);
4618 
4619 			memb_state_commit_enter (instance);
4620 		} else {
4621 			goto out;
4622 		}
4623 	} else
4624 	if (memb_set_subset (proc_list,
4625 		memb_join->proc_list_entries,
4626 		instance->my_proc_list,
4627 		instance->my_proc_list_entries) &&
4628 
4629 		memb_set_subset (failed_list,
4630 		memb_join->failed_list_entries,
4631 		instance->my_failed_list,
4632 		instance->my_failed_list_entries)) {
4633 
4634 		goto out;
4635 	} else
4636 	if (memb_set_subset (&memb_join->system_from, 1,
4637 		instance->my_failed_list, instance->my_failed_list_entries)) {
4638 
4639 		goto out;
4640 	} else {
4641 		memb_set_merge (proc_list,
4642 			memb_join->proc_list_entries,
4643 			instance->my_proc_list, &instance->my_proc_list_entries);
4644 
4645 		if (memb_set_subset (
4646 			&instance->my_id, 1,
4647 			failed_list, memb_join->failed_list_entries)) {
4648 
4649 			memb_set_merge (
4650 				&memb_join->system_from, 1,
4651 				instance->my_failed_list, &instance->my_failed_list_entries);
4652 		} else {
4653 			if (memb_set_subset (
4654 				&memb_join->system_from, 1,
4655 				instance->my_memb_list,
4656 				instance->my_memb_entries)) {
4657 
4658 				if (memb_set_subset (
4659 					&memb_join->system_from, 1,
4660 					instance->my_failed_list,
4661 					instance->my_failed_list_entries) == 0) {
4662 
4663 					memb_set_merge (failed_list,
4664 						memb_join->failed_list_entries,
4665 						instance->my_failed_list, &instance->my_failed_list_entries);
4666 				} else {
4667 					memb_set_subtract (fail_minus_memb,
4668 						&fail_minus_memb_entries,
4669 						failed_list,
4670 						memb_join->failed_list_entries,
4671 						instance->my_memb_list,
4672 						instance->my_memb_entries);
4673 
4674 					memb_set_merge (fail_minus_memb,
4675 						fail_minus_memb_entries,
4676 						instance->my_failed_list,
4677 						&instance->my_failed_list_entries);
4678 				}
4679 			}
4680 		}
4681 		memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_JOIN);
4682 		gather_entered = 1;
4683 	}
4684 
4685 out:
4686 	if (gather_entered == 0 &&
4687 		instance->memb_state == MEMB_STATE_OPERATIONAL) {
4688 
4689 		memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE);
4690 	}
4691 }
4692 
memb_join_endian_convert(const struct memb_join * in,struct memb_join * out)4693 static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out)
4694 {
4695 	int i;
4696 	struct srp_addr *in_proc_list;
4697 	struct srp_addr *in_failed_list;
4698 	struct srp_addr *out_proc_list;
4699 	struct srp_addr *out_failed_list;
4700 
4701 	out->header.type = in->header.type;
4702 	out->header.endian_detector = ENDIAN_LOCAL;
4703 	out->header.nodeid = swab32 (in->header.nodeid);
4704 	srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4705 	out->proc_list_entries = swab32 (in->proc_list_entries);
4706 	out->failed_list_entries = swab32 (in->failed_list_entries);
4707 	out->ring_seq = swab64 (in->ring_seq);
4708 
4709 	in_proc_list = (struct srp_addr *)in->end_of_memb_join;
4710 	in_failed_list = in_proc_list + out->proc_list_entries;
4711 	out_proc_list = (struct srp_addr *)out->end_of_memb_join;
4712 	out_failed_list = out_proc_list + out->proc_list_entries;
4713 
4714 	for (i = 0; i < out->proc_list_entries; i++) {
4715 		srp_addr_copy_endian_convert (&out_proc_list[i], &in_proc_list[i]);
4716 	}
4717 	for (i = 0; i < out->failed_list_entries; i++) {
4718 		srp_addr_copy_endian_convert (&out_failed_list[i], &in_failed_list[i]);
4719 	}
4720 }
4721 
memb_commit_token_endian_convert(const struct memb_commit_token * in,struct memb_commit_token * out)4722 static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out)
4723 {
4724 	int i;
4725 	struct srp_addr *in_addr = (struct srp_addr *)in->end_of_commit_token;
4726 	struct srp_addr *out_addr = (struct srp_addr *)out->end_of_commit_token;
4727 	struct memb_commit_token_memb_entry *in_memb_list;
4728 	struct memb_commit_token_memb_entry *out_memb_list;
4729 
4730 	out->header.type = in->header.type;
4731 	out->header.endian_detector = ENDIAN_LOCAL;
4732 	out->header.nodeid = swab32 (in->header.nodeid);
4733 	out->token_seq = swab32 (in->token_seq);
4734 	totemip_copy_endian_convert(&out->ring_id.rep, &in->ring_id.rep);
4735 	out->ring_id.seq = swab64 (in->ring_id.seq);
4736 	out->retrans_flg = swab32 (in->retrans_flg);
4737 	out->memb_index = swab32 (in->memb_index);
4738 	out->addr_entries = swab32 (in->addr_entries);
4739 
4740 	in_memb_list = (struct memb_commit_token_memb_entry *)(in_addr + out->addr_entries);
4741 	out_memb_list = (struct memb_commit_token_memb_entry *)(out_addr + out->addr_entries);
4742 	for (i = 0; i < out->addr_entries; i++) {
4743 		srp_addr_copy_endian_convert (&out_addr[i], &in_addr[i]);
4744 
4745 		/*
4746 		 * Only convert the memb entry if it has been set
4747 		 */
4748 		if (in_memb_list[i].ring_id.rep.family != 0) {
4749 			totemip_copy_endian_convert (&out_memb_list[i].ring_id.rep,
4750 				     &in_memb_list[i].ring_id.rep);
4751 
4752 			out_memb_list[i].ring_id.seq =
4753 				swab64 (in_memb_list[i].ring_id.seq);
4754 			out_memb_list[i].aru = swab32 (in_memb_list[i].aru);
4755 			out_memb_list[i].high_delivered = swab32 (in_memb_list[i].high_delivered);
4756 			out_memb_list[i].received_flg = swab32 (in_memb_list[i].received_flg);
4757 		}
4758 	}
4759 }
4760 
orf_token_endian_convert(const struct orf_token * in,struct orf_token * out)4761 static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out)
4762 {
4763 	int i;
4764 
4765 	out->header.type = in->header.type;
4766 	out->header.endian_detector = ENDIAN_LOCAL;
4767 	out->header.nodeid = swab32 (in->header.nodeid);
4768 	out->seq = swab32 (in->seq);
4769 	out->token_seq = swab32 (in->token_seq);
4770 	out->aru = swab32 (in->aru);
4771 	totemip_copy_endian_convert(&out->ring_id.rep, &in->ring_id.rep);
4772 	out->aru_addr = swab32(in->aru_addr);
4773 	out->ring_id.seq = swab64 (in->ring_id.seq);
4774 	out->fcc = swab32 (in->fcc);
4775 	out->backlog = swab32 (in->backlog);
4776 	out->retrans_flg = swab32 (in->retrans_flg);
4777 	out->rtr_list_entries = swab32 (in->rtr_list_entries);
4778 	for (i = 0; i < out->rtr_list_entries; i++) {
4779 		totemip_copy_endian_convert(&out->rtr_list[i].ring_id.rep, &in->rtr_list[i].ring_id.rep);
4780 		out->rtr_list[i].ring_id.seq = swab64 (in->rtr_list[i].ring_id.seq);
4781 		out->rtr_list[i].seq = swab32 (in->rtr_list[i].seq);
4782 	}
4783 }
4784 
mcast_endian_convert(const struct mcast * in,struct mcast * out)4785 static void mcast_endian_convert (const struct mcast *in, struct mcast *out)
4786 {
4787 	out->header.type = in->header.type;
4788 	out->header.endian_detector = ENDIAN_LOCAL;
4789 	out->header.nodeid = swab32 (in->header.nodeid);
4790 	out->header.encapsulated = in->header.encapsulated;
4791 
4792 	out->seq = swab32 (in->seq);
4793 	out->this_seqno = swab32 (in->this_seqno);
4794 	totemip_copy_endian_convert(&out->ring_id.rep, &in->ring_id.rep);
4795 	out->ring_id.seq = swab64 (in->ring_id.seq);
4796 	out->node_id = swab32 (in->node_id);
4797 	out->guarantee = swab32 (in->guarantee);
4798 	srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4799 }
4800 
memb_merge_detect_endian_convert(const struct memb_merge_detect * in,struct memb_merge_detect * out)4801 static void memb_merge_detect_endian_convert (
4802 	const struct memb_merge_detect *in,
4803 	struct memb_merge_detect *out)
4804 {
4805 	out->header.type = in->header.type;
4806 	out->header.endian_detector = ENDIAN_LOCAL;
4807 	out->header.nodeid = swab32 (in->header.nodeid);
4808 	totemip_copy_endian_convert(&out->ring_id.rep, &in->ring_id.rep);
4809 	out->ring_id.seq = swab64 (in->ring_id.seq);
4810 	srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4811 }
4812 
ignore_join_under_operational(struct totemsrp_instance * instance,const struct memb_join * memb_join)4813 static int ignore_join_under_operational (
4814 	struct totemsrp_instance *instance,
4815 	const struct memb_join *memb_join)
4816 {
4817 	struct srp_addr *proc_list;
4818 	struct srp_addr *failed_list;
4819 	unsigned long long ring_seq;
4820 
4821 	proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4822 	failed_list = proc_list + memb_join->proc_list_entries;
4823 	ring_seq = memb_join->ring_seq;
4824 
4825 	if (memb_set_subset (&instance->my_id, 1,
4826 	    failed_list, memb_join->failed_list_entries)) {
4827 		return (1);
4828 	}
4829 
4830 	/*
4831 	 * In operational state, my_proc_list is exactly the same as
4832 	 * my_memb_list.
4833 	 */
4834 	if ((memb_set_subset (&memb_join->system_from, 1,
4835 	    instance->my_memb_list, instance->my_memb_entries)) &&
4836 	    (ring_seq < instance->my_ring_id.seq)) {
4837 		return (1);
4838 	}
4839 
4840 	return (0);
4841 }
4842 
message_handler_memb_join(struct totemsrp_instance * instance,const void * msg,size_t msg_len,int endian_conversion_needed)4843 static int message_handler_memb_join (
4844 	struct totemsrp_instance *instance,
4845 	const void *msg,
4846 	size_t msg_len,
4847 	int endian_conversion_needed)
4848 {
4849 	const struct memb_join *memb_join;
4850 	struct memb_join *memb_join_convert = alloca (msg_len);
4851 
4852 	if (check_memb_join_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4853 		return (0);
4854 	}
4855 
4856 	if (endian_conversion_needed) {
4857 		memb_join = memb_join_convert;
4858 		memb_join_endian_convert (msg, memb_join_convert);
4859 
4860 	} else {
4861 		memb_join = msg;
4862 	}
4863 	/*
4864 	 * If the process paused because it wasn't scheduled in a timely
4865 	 * fashion, flush the join messages because they may be queued
4866 	 * entries
4867 	 */
4868 	if (pause_flush (instance)) {
4869 		return (0);
4870 	}
4871 
4872 	if (instance->token_ring_id_seq < memb_join->ring_seq) {
4873 		instance->token_ring_id_seq = memb_join->ring_seq;
4874 	}
4875 	switch (instance->memb_state) {
4876 		case MEMB_STATE_OPERATIONAL:
4877 			if (!ignore_join_under_operational (instance, memb_join)) {
4878 				memb_join_process (instance, memb_join);
4879 			}
4880 			break;
4881 
4882 		case MEMB_STATE_GATHER:
4883 			memb_join_process (instance, memb_join);
4884 			break;
4885 
4886 		case MEMB_STATE_COMMIT:
4887 			if (memb_set_subset (&memb_join->system_from,
4888 				1,
4889 				instance->my_new_memb_list,
4890 				instance->my_new_memb_entries) &&
4891 
4892 				memb_join->ring_seq >= instance->my_ring_id.seq) {
4893 
4894 				memb_join_process (instance, memb_join);
4895 				memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE);
4896 			}
4897 			break;
4898 
4899 		case MEMB_STATE_RECOVERY:
4900 			if (memb_set_subset (&memb_join->system_from,
4901 				1,
4902 				instance->my_new_memb_list,
4903 				instance->my_new_memb_entries) &&
4904 
4905 				memb_join->ring_seq >= instance->my_ring_id.seq) {
4906 
4907 				memb_join_process (instance, memb_join);
4908 				memb_recovery_state_token_loss (instance);
4909 				memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY);
4910 			}
4911 			break;
4912 	}
4913 	return (0);
4914 }
4915 
message_handler_memb_commit_token(struct totemsrp_instance * instance,const void * msg,size_t msg_len,int endian_conversion_needed)4916 static int message_handler_memb_commit_token (
4917 	struct totemsrp_instance *instance,
4918 	const void *msg,
4919 	size_t msg_len,
4920 	int endian_conversion_needed)
4921 {
4922 	struct memb_commit_token *memb_commit_token_convert = alloca (msg_len);
4923 	struct memb_commit_token *memb_commit_token;
4924 	struct srp_addr sub[PROCESSOR_COUNT_MAX];
4925 	int sub_entries;
4926 
4927 	struct srp_addr *addr;
4928 
4929 	log_printf (instance->totemsrp_log_level_debug,
4930 		"got commit token");
4931 
4932 	if (check_memb_commit_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4933 		return (0);
4934 	}
4935 
4936 	if (endian_conversion_needed) {
4937 		memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4938 	} else {
4939 		memcpy (memb_commit_token_convert, msg, msg_len);
4940 	}
4941 	memb_commit_token = memb_commit_token_convert;
4942 	addr = (struct srp_addr *)memb_commit_token->end_of_commit_token;
4943 
4944 #ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4945 	if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
4946 		return (0);
4947 	}
4948 #endif
4949 	switch (instance->memb_state) {
4950 		case MEMB_STATE_OPERATIONAL:
4951 			/* discard token */
4952 			break;
4953 
4954 		case MEMB_STATE_GATHER:
4955 			memb_set_subtract (sub, &sub_entries,
4956 				instance->my_proc_list, instance->my_proc_list_entries,
4957 				instance->my_failed_list, instance->my_failed_list_entries);
4958 
4959 			if (memb_set_equal (addr,
4960 				memb_commit_token->addr_entries,
4961 				sub,
4962 				sub_entries) &&
4963 
4964 				memb_commit_token->ring_id.seq > instance->my_ring_id.seq) {
4965 				memcpy (instance->commit_token, memb_commit_token, msg_len);
4966 				memb_state_commit_enter (instance);
4967 			}
4968 			break;
4969 
4970 		case MEMB_STATE_COMMIT:
4971 			/*
4972 			 * If retransmitted commit tokens are sent on this ring
4973 			 * filter them out and only enter recovery once the
4974 			 * commit token has traversed the array.  This is
4975 			 * determined by :
4976 		 	 * memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4977 			 */
4978 			 if (memb_commit_token->ring_id.seq == instance->my_ring_id.seq &&
4979 				memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4980 				memb_state_recovery_enter (instance, memb_commit_token);
4981 			}
4982 			break;
4983 
4984 		case MEMB_STATE_RECOVERY:
4985 			if (totemip_equal (&instance->my_id.addr[0], &instance->my_ring_id.rep)) {
4986 
4987 				/* Filter out duplicated tokens */
4988 				if (instance->originated_orf_token) {
4989 					break;
4990 				}
4991 
4992 				instance->originated_orf_token = 1;
4993 
4994 				log_printf (instance->totemsrp_log_level_debug,
4995 					"Sending initial ORF token");
4996 
4997 				// TODO convert instead of initiate
4998 				orf_token_send_initial (instance);
4999 				reset_token_timeout (instance); // REVIEWED
5000 				reset_token_retransmit_timeout (instance); // REVIEWED
5001 			}
5002 			break;
5003 	}
5004 	return (0);
5005 }
5006 
message_handler_token_hold_cancel(struct totemsrp_instance * instance,const void * msg,size_t msg_len,int endian_conversion_needed)5007 static int message_handler_token_hold_cancel (
5008 	struct totemsrp_instance *instance,
5009 	const void *msg,
5010 	size_t msg_len,
5011 	int endian_conversion_needed)
5012 {
5013 	const struct token_hold_cancel *token_hold_cancel = msg;
5014 
5015 	if (check_token_hold_cancel_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
5016 		return (0);
5017 	}
5018 
5019 	if (memcmp (&token_hold_cancel->ring_id, &instance->my_ring_id,
5020 		sizeof (struct memb_ring_id)) == 0) {
5021 
5022 		instance->my_seq_unchanged = 0;
5023 		if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0])) {
5024 			timer_function_token_retransmit_timeout (instance);
5025 		}
5026 	}
5027 	return (0);
5028 }
5029 
main_deliver_fn(void * context,const void * msg,unsigned int msg_len)5030 void main_deliver_fn (
5031 	void *context,
5032 	const void *msg,
5033 	unsigned int msg_len)
5034 {
5035 	struct totemsrp_instance *instance = context;
5036 	const struct message_header *message_header = msg;
5037 
5038 	if (msg_len < sizeof (struct message_header)) {
5039 		log_printf (instance->totemsrp_log_level_security,
5040 			    "Received message is too short...  ignoring %u.",
5041 			    (unsigned int)msg_len);
5042 		return;
5043 	}
5044 
5045 
5046 	switch (message_header->type) {
5047 	case MESSAGE_TYPE_ORF_TOKEN:
5048 		instance->stats.orf_token_rx++;
5049 		break;
5050 	case MESSAGE_TYPE_MCAST:
5051 		instance->stats.mcast_rx++;
5052 		break;
5053 	case MESSAGE_TYPE_MEMB_MERGE_DETECT:
5054 		instance->stats.memb_merge_detect_rx++;
5055 		break;
5056 	case MESSAGE_TYPE_MEMB_JOIN:
5057 		instance->stats.memb_join_rx++;
5058 		break;
5059 	case MESSAGE_TYPE_MEMB_COMMIT_TOKEN:
5060 		instance->stats.memb_commit_token_rx++;
5061 		break;
5062 	case MESSAGE_TYPE_TOKEN_HOLD_CANCEL:
5063 		instance->stats.token_hold_cancel_rx++;
5064 		break;
5065 	default:
5066 		log_printf (instance->totemsrp_log_level_security, "Type of received message is wrong...  ignoring %d.\n", (int)message_header->type);
5067 printf ("wrong message type\n");
5068 		instance->stats.rx_msg_dropped++;
5069 		return;
5070 	}
5071 	/*
5072 	 * Handle incoming message
5073 	 */
5074 	totemsrp_message_handlers.handler_functions[(int)message_header->type] (
5075 		instance,
5076 		msg,
5077 		msg_len,
5078 		message_header->endian_detector != ENDIAN_LOCAL);
5079 }
5080 
main_iface_change_fn(void * context,const struct totem_ip_address * iface_addr,unsigned int iface_no)5081 void main_iface_change_fn (
5082 	void *context,
5083 	const struct totem_ip_address *iface_addr,
5084 	unsigned int iface_no)
5085 {
5086 	struct totemsrp_instance *instance = context;
5087 	int i;
5088 
5089 	totemip_copy (&instance->my_id.addr[iface_no], iface_addr);
5090 	assert (instance->my_id.addr[iface_no].nodeid);
5091 
5092 	totemip_copy (&instance->my_memb_list[0].addr[iface_no], iface_addr);
5093 
5094 	if (instance->iface_changes++ == 0) {
5095 		instance->memb_ring_id_create_or_load (&instance->my_ring_id,
5096 		    &instance->my_id.addr[0]);
5097 		instance->token_ring_id_seq = instance->my_ring_id.seq;
5098 		log_printf (
5099 			instance->totemsrp_log_level_debug,
5100 			"Created or loaded sequence id %llx.%s for this ring.",
5101 			instance->my_ring_id.seq,
5102 			totemip_print (&instance->my_ring_id.rep));
5103 
5104 		if (instance->totemsrp_service_ready_fn) {
5105 			instance->totemsrp_service_ready_fn ();
5106 		}
5107 
5108 	}
5109 
5110 	for (i = 0; i < instance->totem_config->interfaces[iface_no].member_count; i++) {
5111 		totemsrp_member_add (instance,
5112 			&instance->totem_config->interfaces[iface_no].member_list[i],
5113 			iface_no);
5114 	}
5115 
5116 	if (instance->iface_changes >= instance->totem_config->interface_count) {
5117 		memb_state_gather_enter (instance, TOTEMSRP_GSFROM_INTERFACE_CHANGE);
5118 	}
5119 }
5120 
totemsrp_net_mtu_adjust(struct totem_config * totem_config)5121 void totemsrp_net_mtu_adjust (struct totem_config *totem_config) {
5122 	totem_config->net_mtu -= sizeof (struct mcast);
5123 }
5124 
totemsrp_service_ready_register(void * context,void (* totem_service_ready)(void))5125 void totemsrp_service_ready_register (
5126 	void *context,
5127         void (*totem_service_ready) (void))
5128 {
5129 	struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5130 
5131 	instance->totemsrp_service_ready_fn = totem_service_ready;
5132 }
5133 
totemsrp_member_add(void * context,const struct totem_ip_address * member,int ring_no)5134 int totemsrp_member_add (
5135         void *context,
5136         const struct totem_ip_address *member,
5137         int ring_no)
5138 {
5139 	struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5140 	int res;
5141 
5142 	res = totemrrp_member_add (instance->totemrrp_context, member, ring_no);
5143 
5144 	return (res);
5145 }
5146 
totemsrp_member_remove(void * context,const struct totem_ip_address * member,int ring_no)5147 int totemsrp_member_remove (
5148         void *context,
5149         const struct totem_ip_address *member,
5150         int ring_no)
5151 {
5152 	struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5153 	int res;
5154 
5155 	res = totemrrp_member_remove (instance->totemrrp_context, member, ring_no);
5156 
5157 	return (res);
5158 }
5159 
totemsrp_threaded_mode_enable(void * context)5160 void totemsrp_threaded_mode_enable (void *context)
5161 {
5162 	struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5163 
5164 	instance->threaded_mode_enabled = 1;
5165 }
5166 
totemsrp_trans_ack(void * context)5167 void totemsrp_trans_ack (void *context)
5168 {
5169 	struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5170 
5171 	instance->waiting_trans_ack = 0;
5172 	instance->totemsrp_waiting_trans_ack_cb_fn (0);
5173 }
5174