1 /* Copyright (c) 2012, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA */
22 
23 /** \file The new version of xcom is a major rewrite to allow
24     transmission of multiple messages from several sources
25     simultaneously without collision. The interface to xcom is largely
26     intact, one notable change is that xcom will consider the message
27     delivered as soon as it has got a majority. Consequently, the VP
28     set will not necessarily show all nodes which will actually
29     receive the message.
30 
31     OHKFIX Add wait for complete last known node set to mimic the old
32     semantics.
33 
34 
35     IMPORTANT: What xcom does and what it does not do:
36 
37     xcom messages are received in the same order on all nodes.
38 
39     xcom guarantees that if a message is delivered to one node, it will
40     eventually be seen on all other nodes as well.
41 
42     xcom messages are available to a crashed node when it comes up
43     again if at least one node which knows the value of the message
44     has not crashed. The size of the message cache is configurable.
45 
46     OHKFIX Add logging to disk to make messages durable across system
47     crash and to increase the number of messages which may be cached.
48 
49     There is no guarantee whatsoever about the order of messages from
50     different nodes, not even the order of multiple messages from the
51     same node. It is up to the client to impose such an order by
52     waiting on a message before it sends the next.
53 
54     xcom can notify the client that a message has timed out, and in
55     that case will try to cancel the message, but it cannot guarantee
56     that a message which has timed out will not be delivered.
57 
58     xcom attaches a node set to each message as it is delivered to the
59     client. This node set reflects the current node set that xcom
60     believes is active, it does not mean that the message has been
61     delivered yet to all nodes in the set. Neither does it mean that
62     the message has not been delivered to the nodes not in the set.
63 
64     A cache of Paxos state machines is central to the new design. The
65     purpose of the cache is both to store a window of messages, and to
66     decouple the different parts of xcom, like message proposal,
67     message delivery and execution, and recovery.  The old cache was
68     limited to caching messages, and a single state machine ran the
69     combined VP and Paxos algorithm. This constrained xcom to deliver
70     only a single message at a time.
71 
72     Each instance of the Paxos state machine implements the basic
73     Paxos protocol.  Unlike the cache in the old system, it is not
74     cleared when a site is deleted.  This removes some problems
75     related to message delivery during site deletion.  The cache is a
76     classic fixed size LRU with a hash index.
77 
78     Some extensions to the basic Paxos algorithm has been implemented:
79 
80     A node has ownership to all synodes with its own node number. Only
81     a node with node number N can propose a value for synode {X N},
82     where X is the sequence number, and N is the node number. Other
83     nodes can only propose the special value no_op for synode {X N}.
84     The reason for this is to retain the leaderless Paxos algorithm,
85     but to avoid collisions between nodes which are competing for the
86     same synode number. With this scheme, each node has its own unique
87     number series during normal operation. The scheme has the
88     following implications:
89 
90     1. If a node N has not already proposed a value for the synode {X N},
91     it may at any time send a LEARN message to the other nodes with
92     the reserved value no_op, without going through phase 1 and 2 of
93     Paxos. This is because the other nodes are constrained to propose
94     no_op for this synode, so the final outcome will always be no_op.
95     To avoid unnecessary message transmission, a node will try to
96     broadcast the no_op LEARN messages by piggybacking the information
97     on the messages of the basic Paxos protocol.
98 
99     2. Other nodes which want to find the value of synode {X N} may do
100     so by trying to get the value no_op accepted by following the
101     basic Paxos algorithm. The result will be the actual value
102     proposed by node N if it has done so, otherwise no_op. This will
103     typically only be necessary when a node is down, and the other
104     nodes need to find the values from the missing node in order to be
105     able to continue execution.
106 
107     Messages are delivered in order to the client, and the order is
108     determined by the sequence number and the node number, with the
109     sequence number as the most significant part.
110 
111     The xcom network interface has been redesigned and is now
112     implemented directly on top of TCP, and has so far been completely
113     trouble free. We use poll() or select() to implement non-blocking
114     send and receive, but libev could equally well have been used.
115 
116     Multicast is implemented on top of unicast as before, but the
117     implementation is prepared to use real multicast with relatively
118     minor changes.
119 
120     The roles of proposer, acceptor/learner, and executor are now
121     directly mapped to unique task types which interact with the Paxos
122     state machines, whereas the previous implementation folded all the
123     roles into a single event driven state machine.
124 
125     The following terminology will be used:
126 
127     A node is an instance of the xcom thread. There is only one instance
128     of the xcom thread in the agent.
129     A client is the application which is using xcom to send messages.
130     A thread is a real OS thread.
131     A task is a logical process. It is implemented by coroutines and
132     an explicit stack.
133 
134     The implementation of tasks and non-blocking socket operations is
135     isolated in task.h and task.c.
136 
137     A node will open a tcp connection to each of the other nodes. This
138     connection is used for all communication initiated by the node,
139     and replies to messages will arrive on the connection on which it
140     was sent.
141 
142     static int tcp_server(task_arg);
143 
144     The tcp_server listens on the xcom port and starts an
145     acceptor_learner_task whenever a new connection is detected.
146 
147     static int tcp_reaper_task(task_arg);
148 
149     Closes tcp connection which have been unused for too long.
150 
151     static int sender_task(task_arg);
152 
153     The sender_task waits for tcp messages on its input queue and
154     sends it on the tcp socket. If the socket is closed for any
155     reason, the sender_task will reconnect the socket. There is one
156     sender_task for each socket. The sender task exists mainly to
157     simplify the logic in the other tasks, but it could have been
158     replaced with a coroutine which handles the connection logic after
159     having reserved the socket for its client task.
160 
161     static int generator_task(task_arg);
162 
163     The generator_task reads messages from the client queue and moves
164     them into the input queue of the proposer_task.
165 
166     OHKFIX Use a tcp socket instead of the client queue. We can then
167     remove the generator_task and let the acceptor_learner_task do the
168     dispatching.
169 
170     static int proposer_task(task_arg);
171 
172     Assign a message number to an outgoing message and try to get it
173     accepted. There may be several proposer tasks on each node
174     working in parallel. If there are multiple proposer tasks, xcom can
175     not guarantee that the messages will be sent in the same order as
176     received from the client.
177 
178     static int acceptor_learner_task(task_arg);
179 
180     This is the server part of the xcom thread. There is one
181     acceptor_learner_task for each node in the system. The acceptor
182     learner_task reads messages from the socket, finds the correct
183     Paxos state machine, and dispatches to the correct message handler
184     with the state machine and message as arguments.
185 
186     static int reply_handler_task(task_arg);
187 
188     The reply_handler_task does the same job as the
189     acceptor_learner_task, but listens on the socket which the node
190     uses to send messages, so it will handle only replies on that
191     socket.
192 
193     static int executor_task(task_arg);
194 
195     The ececutor_task waits for a Paxos message to be accpeted. When
196     the message is accepted, it is delivered to the client,
197     unless it is a no-op. In either case, the executor_task steps to
198     the next message and repeats the wait. If it times out waiting for
199     a message, it will try to get a no-op accepted.
200 
201     static int alive_task(task_arg);
202 
203     Sends i-am-alive to other nodes if there has been no normal traffic
204     for a while. It also pings nodes which seem to be inactive.
205 
206     static int detector_task(task_arg);
207 
208     The detector_task periodically scans the set of connections from
209     other nodes and sees if there has been any activity. If there has
210     been no activity for some time, it will assume that the node is
211     dead, and send a view message to the client.
212 
213     static int boot_task(task_arg);
214 
215     The boot task is started whenever xcom has no site definition. It
216     listens on the input queue until it detects either a boot or
217     recovery. In case of a boot, it will wait for a unified boot
218     message.  In case of local recovery, it will wait until has seen
219     all recover messages.  In both cases, the proposer task will try
220     to get those messages accepted/
221 
222     static int boot_killer_task(task_arg);
223 
224     Abort the boot process if there is no progress.
225 
226     static int net_boot_task(task_arg);
227     static int net_recover_task(task_arg);
228 
229     Reconfiguration:
230 
231     The xcom reconfiguration process is essentially the one described in
232     "Reconfiguring a State Machine" by Lamport et al. as the R-alpha
233     algorithm.
234     We execute the reconfiguration command immediately, but the config is
235     only valid after a delay of alpha messages.
236     The parameter alpha is the same as
237     EVENT_HORIZON in this implementation. :/static.*too_far
238     All tcp messages from beyond the event horizon will be ignored.
239 
240 */
241 
242 #include "x_platform.h"
243 
244 #include <stdio.h>
245 #include <string.h>
246 #include <errno.h>
247 #include <stdlib.h>
248 #include <sys/types.h>
249 #include <assert.h>
250 #include <signal.h>
251 #include <sys/time.h>
252 #include <limits.h>
253 
254 #ifndef WIN
255 #include <sys/socket.h>
256 #include <netdb.h>
257 #include <sys/ioctl.h>
258 #include <net/if.h>
259 #ifndef __linux__
260 #include <sys/sockio.h>
261 #endif
262 #endif
263 
264 #if defined(WIN32) || defined(WIN64)
265 #include <windows.h>
266 #endif
267 
268 #ifndef _WIN32
269 #include <poll.h>
270 #endif
271 
272 #include "xdr_utils.h"
273 #include "xcom_common.h"
274 
275 #include "task_os.h"
276 
277 #include "xcom_vp.h"
278 
279 #include "simset.h"
280 #include "app_data.h"
281 
282 #include "task.h"
283 #include "node_no.h"
284 #include "server_struct.h"
285 #include "xcom_detector.h"
286 #include "site_struct.h"
287 #include "xcom_transport.h"
288 #include "xcom_base.h"
289 
290 #ifdef XCOM_HAVE_OPENSSL
291 #include "xcom_ssl_transport.h"
292 #endif
293 
294 #include "task.h"
295 #include "task_net.h"
296 #include "task_debug.h"
297 #include "xcom_statistics.h"
298 #include "node_set.h"
299 #include "node_list.h"
300 #include "bitset.h"
301 
302 #include "xcom_cache.h"
303 
304 #include "xcom_vp_str.h"
305 #include "pax_msg.h"
306 #include "xcom_msg_queue.h"
307 #include "xcom_recover.h"
308 #include "synode_no.h"
309 #include "sock_probe.h"
310 #include "xcom_interface.h"
311 #include "xcom_memory.h"
312 #include "site_def.h"
313 #include "xcom_cfg.h"
314 
315 #ifdef XCOM_HAVE_OPENSSL
316 #include "openssl/ssl.h"
317 #endif
318 
319 /* {{{ Defines and constants */
320 
321 #define SYS_STRERROR_SIZE 512
322 #define TERMINATE_DELAY 3.0
323 #define EVENT_HORIZON_MIN 10
324 unsigned int event_horizon = EVENT_HORIZON_MIN;
325 
326 static void set_event_horizon(unsigned int eh) MY_ATTRIBUTE((unused));
327 /* purecov: begin deadcode */
set_event_horizon(unsigned int eh)328 static void set_event_horizon(unsigned int eh)
329 {
330 	DBGOUT(FN; NDBG(eh,u));
331 	event_horizon = eh;
332 }
333 /* purecov: end */
334 
335 /* The number of proposers on one node */
336 #define PROPOSERS 10
337 
338 /* Limit the number of acceptors */
339 /* #define MAXACCEPT 5  */
340 
341 /* Skip prepare for first ballot */
342 int const threephase = 0;
343 
344 /* Error injection for testing */
345 #define INJECT_ERROR 0
346 
347 /* Crash a node early */
348 /* #define CRASH 1 */
349 
350 /* }}} */
351 
352 
353 #include "retry.h"
354 
355 /* #define USE_EXIT_TYPE */
356 /* #define NO_SWEEPER_TASK */
357 
358 /* Limit batch size to sensible ? amount */
359 enum{
360 	MAX_BATCH_SIZE = 0x3fffffff
361 };
362 
363 int ARBITRATOR_HACK = 0;
364 static int AUTOBATCH = 1;
365 #define AGGRESSIVE_SWEEP
366 
367 static int const no_duplicate_payload = 1;
368 
369 /* Use buffered read when reading messages from the network */
370 static int use_buffered_read = 1;
371 
372 /* Used to handle OOM errors */
373 static unsigned short oom_abort = 0;
374 
375 /* {{{ Forward declarations */
376 
377 long	get_unique_long(void);
378 unsigned long	msg_count(app_data_ptr a);
379 void	get_host_name(char *a, char *name);
380 
381 static double	wakeup_delay(double old);
382 
383 /* Task types */
384 static int	proposer_task(task_arg arg);
385 static int	executor_task(task_arg arg);
386 static int	sweeper_task(task_arg arg);
387 extern int	alive_task(task_arg arg);
388 static int	generator_task(task_arg arg);
389 extern int	detector_task(task_arg arg);
390 
391 static int	finished(pax_machine *p);
392 static int	accepted(pax_machine *p);
393 static int	started(pax_machine *p);
394 static synode_no first_free_synode(synode_no msgno);
395 static void free_forced_config_site_def();
396 
397 extern void	bit_set_or(bit_set *x, bit_set const *y);
398 
399 /* }}} */
400 
401 /* {{{ Global variables */
402 
403 int	xcom_shutdown = 0; /* Xcom_Shutdown flag */
404 synode_no executed_msg;    /* The message we are waiting to execute */
405 synode_no max_synode;      /* Max message number seen so far */
406 task_env *boot = NULL;
407 task_env *detector = NULL;
408 task_env *killer = NULL;
409 task_env *net_boot = NULL;
410 task_env *net_recover = NULL;
411 void	*xcom_thread_input = 0;
412 
413 static void	init_proposers();
414 
init_base_vars()415 void	init_base_vars()
416 {
417 	xcom_shutdown = 0; /* Xcom_Shutdown flag */
418 	executed_msg = null_synode; /* The message we are waiting to execute */
419 	max_synode = null_synode; /* Max message number seen so far */
420 	boot = NULL;
421 	detector = NULL;
422 	killer = NULL;
423 	net_boot = NULL;
424 	net_recover = NULL;
425 	xcom_thread_input = 0;
426 }
427 
428 static task_env *executor = NULL;
429 static task_env *sweeper = NULL;
430 static task_env *retry = NULL;
431 static task_env *proposer[PROPOSERS];
432 static task_env *alive_t = NULL;
433 
434 static uint32_t	my_id = 0; /* Unique id of this instance */
435 static synode_no current_message; /* Current message number */
436 static synode_no last_config_modification_id; /*Last configuration change proposal*/
437 
get_current_message()438 synode_no get_current_message()
439 {
440 	return current_message;
441 }
442 
443 static channel prop_input_queue;         /* Proposer task input queue */
444 
445 /* purecov: begin deadcode */
get_prop_input_queue()446 channel *get_prop_input_queue()
447 {
448 	return & prop_input_queue;
449 }
450 /* purecov: end */
451 
452 extern int	client_boot_done;
453 extern int	netboot_ok;
454 extern int	booting;
455 
456 extern start_t start_type;
457 static linkage exec_wait = {0,&exec_wait, &exec_wait};           /* Executor will wake up tasks sleeping here */
458 
459 /*
460 #define IGNORE_LOSERS
461 */
462 
463 #define BUILD_TIMEOUT 3.0
464 
465 #define MAX_DEAD 10
466 static struct {
467 	int	n;
468 	unsigned long	id[MAX_DEAD];
469 } dead_sites;
470 
get_max_synode()471 synode_no get_max_synode()
472 {
473 	return max_synode;
474 }
475 
476 static
synode_set_to_event_horizon(synode_no * s)477 void synode_set_to_event_horizon(synode_no *s)
478 {
479   s->msgno += event_horizon + 1;
480   s->node= 0;
481 }
482 
483 
484 /**
485    Set node group
486 */
set_group(uint32_t id)487 void	set_group(uint32_t id)
488 {
489 	MAY_DBG(FN; STRLIT("changing group id of global variables "); NDBG(id,lx););
490 /*	set_group_id(id); */
491 	current_message.group_id = id;
492 	executed_msg.group_id = id;
493 	max_synode.group_id = id;
494 	set_log_group_id(id);
495 }
496 
497 
bury_site(uint32_t id)498 static void	bury_site(uint32_t id)
499 {
500 	if (id != 0) {
501 		dead_sites.id[dead_sites.n % MAX_DEAD] = id;
502 		dead_sites.n = (dead_sites.n + 1) % MAX_DEAD;
503 	}
504 }
505 
506 
is_dead_site(uint32_t id)507 static bool_t is_dead_site(uint32_t id)
508 {
509 	int	i = 0;
510 	for (i = 0; i < MAX_DEAD; i++) {
511 		if (dead_sites.id[i] == id)
512 			return TRUE;
513 		else if (dead_sites.id[i] == 0)
514 			return FALSE;
515 	}
516 	return FALSE;
517 }
518 
519 extern node_set *init_node_set(node_set *set, u_int n);
520 extern node_set *alloc_node_set(node_set *set, u_int n);
521 
522 #if 0
523 /* Find our previous message number. */
524 static synode_no decr_msgno(synode_no msgno)
525 {
526 	synode_no ret = msgno;
527 	ret.msgno--;
528 	ret.node = get_nodeno(find_site_def(ret)); /* In case site and node number has changed */
529 	return ret;
530 }
531 #endif
532 
533 /* Find our next message number. */
incr_msgno(synode_no msgno)534 static synode_no incr_msgno(synode_no msgno)
535 {
536 	synode_no ret = msgno;
537 	ret.msgno++;
538 	ret.node = get_nodeno(find_site_def(ret)); /* In case site and node number has changed */
539 	return ret;
540 }
541 
542 #if 0
543 /* Given message number, compute which node it belongs to */
544 static unsigned int	msgno_to_node(synode_no msgno)
545 {
546 	return msgno.node;
547 }
548 #endif
549 
incr_synode(synode_no synode)550 synode_no incr_synode(synode_no synode)
551 {
552 	synode_no ret = synode;
553 	ret.node++;
554 	if (ret.node >= get_maxnodes(find_site_def(synode))) {
555 		ret.node = 0;
556 		ret.msgno++;
557 	}
558 /* 	DBGOUT(FN; SYCEXP(synode); SYCEXP(ret)); */
559 	return ret; /* Change this if we change message number type */
560 }
561 
562 
decr_synode(synode_no synode)563 synode_no decr_synode(synode_no synode)
564 {
565 	synode_no ret = synode;
566 	if (ret.node == 0) {
567 		ret.msgno--;
568 		ret.node = get_maxnodes(find_site_def(ret));
569 	}
570 	ret.node--;
571 	return ret; /* Change this if we change message number type */
572 }
573 
574 
skip_value(pax_msg * p)575 static void	skip_value(pax_msg *p)
576 {
577 	MAY_DBG(FN; SYCEXP(p->synode));
578 	p->op = learn_op;
579 	p->msg_type = no_op;
580 }
581 
582 
583 /* }}} */
584 
585 /* {{{ Utilities and debug */
586 
587 /* purecov: begin deadcode */
588 /* Print message and exit */
pexitall(int i)589 static void	pexitall(int i)
590 {
591 	int	*r = (int*)calloc(1, sizeof(int));
592 	*r = i;
593 	DBGOUT(FN; NDBG(i, d); STRLIT("time "); NDBG(task_now(), f); );
594 	XCOM_FSM(xa_terminate, int_arg(i));	/* Tell xcom to stop */
595 }
596 /* purecov: end */
597 
598 #ifndef WIN
599 /* Ignore this signal */
ignoresig(int signum)600 static int	ignoresig(int signum)
601 {
602 	struct sigaction act;
603 	struct sigaction oldact;
604 
605 	memset(&act, 0, sizeof(act));
606 	act.sa_handler = SIG_IGN;
607 	memset(&oldact, 0, sizeof(oldact));
608 
609 	return sigaction(signum,
610 	    &act,
611 	    &oldact);
612 }
613 #else
614 #define SIGPIPE 0
ignoresig(int signum)615 static int	ignoresig(int signum)
616 {
617 	return 0;
618 }
619 #endif
620 
621 /* }}} */
622 
623 #if 0
624 static void	dbg_machine_and_msg(pax_machine *p, pax_msg *pm)
625 {
626   GET_GOUT;
627   STRLIT("machine ");
628   ADD_GOUT(dbg_pax_machine(p));
629   STRLIT(" ");
630   STRLIT("msg ");
631   COPY_AND_FREE_GOUT(dbg_pax_msg(pm));
632   PRINT_GOUT;
633   FREE_GOUT;
634 }
635 
636 
637 #endif
638 
recently_active(pax_machine * p)639 static int	recently_active(pax_machine *p)
640 {
641 	MAY_DBG(FN;
642 			SYCEXP(p->synode); STRLIT(" op ");
643             PTREXP(p); STRLIT(p->learner.msg ? pax_op_to_str(p->learner.msg->op) : "NULL");
644 	    NDBG(p->last_modified, f); NDBG(task_now(), f));
645 	return p->last_modified != 0.0 && (p->last_modified + 0.5 + median_time()) > task_now();
646 }
647 
648 
finished(pax_machine * p)649 static inline int	finished(pax_machine *p)
650 {
651 	MAY_DBG(FN;
652 	    SYCEXP(p->synode); STRLIT(" op ");
653 	    PTREXP(p); STRLIT(p->learner.msg ? pax_op_to_str(p->learner.msg->op) : "NULL");
654 	    );
655 	return p->learner.msg && (p->learner.msg->op == learn_op || p->learner.msg->op == tiny_learn_op);
656 }
657 
658 
pm_finished(pax_machine * p)659 int	pm_finished(pax_machine *p)
660 {
661 	return finished(p);
662 }
663 
664 
accepted(pax_machine * p)665 static inline int	accepted(pax_machine *p)
666 {
667 	MAY_DBG(FN;
668 	    SYCEXP(p->synode); STRLIT(" op ");
669 	    PTREXP(p); STRLIT(p->acceptor.msg ? pax_op_to_str(p->acceptor.msg->op) : "NULL");
670 	    );
671 	return p->acceptor.msg && p->acceptor.msg->op != initial_op;
672 }
673 
674 
accepted_noop(pax_machine * p)675 static inline int	accepted_noop(pax_machine *p)
676 {
677 	MAY_DBG(FN;
678 	    SYCEXP(p->synode); STRLIT(" op ");
679 	    PTREXP(p); STRLIT(p->acceptor.msg ? pax_op_to_str(p->acceptor.msg->op) : "NULL");
680 	    );
681 	return accepted(p) && p->acceptor.msg->msg_type == no_op;
682 }
683 
684 
noop_match(pax_machine * p,pax_msg * pm)685 static inline int	noop_match(pax_machine *p, pax_msg *pm)
686 {
687 	return pm->msg_type == no_op && accepted_noop(p);
688 }
689 
690 
started(pax_machine * p)691 static inline int	started(pax_machine *p)
692 {
693 	return
694 	    p->op != initial_op ||
695 	    (p->acceptor.promise.cnt > 0) ||
696 	    (p->proposer.msg && (p->proposer.msg->op != initial_op)) ||
697 	    accepted(p) ||
698 	    finished(p);
699 }
700 
701 
702 /* }}} */
703 
set_last_received_config(synode_no received_config_change)704 void set_last_received_config(synode_no received_config_change)
705 {
706   last_config_modification_id= received_config_change;
707 }
708 
709 /* {{{ Definition of majority */
max_check(site_def const * site)710 static inline node_no max_check(site_def const *site)
711 {
712 #ifdef MAXACCEPT
713 	return MIN(get_maxnodes(site), MAXACCEPT);
714 #else
715 	return get_maxnodes(site);
716 #endif
717 }
718 
719 static site_def * forced_config = 0;
720 
721 /* Definition of majority */
majority(bit_set const * nodeset,site_def const * s,int all,int delay MY_ATTRIBUTE ((unused)),int force)722 static inline int	majority(bit_set const *nodeset, site_def const *s, int all, int delay MY_ATTRIBUTE((unused)), int force)
723 {
724 	node_no ok = 0;
725 	node_no i = 0;
726 	int	retval = 0;
727 	node_no max = max_check(s);
728 
729  	/* DBGOUT(FN; NDBG(max,lu); NDBG(all,d); NDBG(delay,d); NDBG(force,d)); */
730 
731 	/* Count nodes that has answered */
732 	for (i = 0; i < max; i++) {
733 		if (BIT_ISSET(i, nodeset)) {
734 			ok++;
735 		}
736 	}
737 
738 	/* If we are forcing messages, attempt to ensure consistency by
739 	   requiring all remaining nodes to agree. Forced_config points to
740 	   the config that should be used as acceptors in this
741 	   case. Another possibility is to use the original config and
742 	   count the number of live nodes, but since the force flag is
743 	   being used only to force a new config, it seems safer to use
744 	   the new config and no time-dependent info. Note that we are
745 	   counting the answers based on the normal config, but use the
746 	   number of nodes from forced_config. This is safe, since we can
747 	   assume that the nodes that are not in forced_config will never
748 	   answer. */
749 
750 	if(force){
751 		DBGOUT(FN; STRLIT("force majority"); NDBG(ok ,u);  NDBG(max ,u); NDBG(get_maxnodes(forced_config),u));
752 		return ok == get_maxnodes(forced_config);
753 	}else{
754 		/* Have now seen answer from all live nodes */
755 		retval = all ? ok == max : ok > max / 2
756 			|| (ARBITRATOR_HACK && (2 == max));
757 		/* 	DBGOUT(FN; NDBG(max,lu); NDBG(all,d); NDBG(delay,d); NDBG(retval,d)); */
758 		return retval;
759 	}
760 }
761 
762 
763 #define IS_CONS_ALL(p) ((p)->proposer.msg->a ? (p)->proposer.msg->a->consensus == cons_all : 0)
764 
765 /* See if a majority of acceptors have answered our prepare */
prep_majority(site_def const * site,pax_machine * p)766 static int	prep_majority(site_def const * site, pax_machine *p)
767 {
768 
769 	int	ok = 0;
770 
771 	assert(p);
772 	assert(p->proposer.prep_nodeset);
773 	assert(p->proposer.msg);
774 	/* DBGOUT(FN; BALCEXP(p->proposer.bal)); */
775 	ok = majority(p->proposer.prep_nodeset, site, IS_CONS_ALL(p), p->proposer.bal.cnt == 1, p->proposer.msg->force_delivery || p->force_delivery);
776 	return ok;
777 }
778 
779 /* See if a majority of acceptors have answered our propose */
prop_majority(site_def const * site,pax_machine * p)780 static int	prop_majority(site_def const * site, pax_machine *p)
781 {
782 	int	ok = 0;
783 
784 	assert(p);
785 	assert(p->proposer.prop_nodeset);
786 	assert(p->proposer.msg);
787 	/* DBGOUT(FN; BALCEXP(p->proposer.bal)); */
788 	ok = majority(p->proposer.prop_nodeset, site, IS_CONS_ALL(p), p->proposer.bal.cnt == 1, p->proposer.msg->force_delivery || p->force_delivery);
789 	return ok;
790 }
791 
792 /* }}} */
793 
794 /* {{{ Xcom thread */
795 
796 /* purecov: begin deadcode */
797 /* Xcom thread start function */
xcom_thread_main(gpointer cp)798 gpointer xcom_thread_main(gpointer cp)
799 {
800 	G_MESSAGE("Starting xcom on port %d", atoi((char *)cp));
801 	xcom_thread_init();
802 	/* Initialize task system and enter main loop */
803 	taskmain((xcom_port)atoi((char *)cp));
804 	/* Xcom is finished when we get here */
805 	DBGOUT(FN; STRLIT("Deconstructing xcom thread"));
806 	xcom_thread_deinit();
807 	G_MESSAGE("Exiting xcom thread");
808 	return NULL;
809 }
810 /* purecov: end */
811 static site_def const * executor_site = 0;
812 
get_executor_site()813 site_def const * get_executor_site()
814 {
815 	return executor_site;
816 }
817 
818 static site_def *proposer_site = 0;
819 
get_proposer_site()820 site_def const *get_proposer_site()
821 {
822 	return proposer_site;
823 }
824 
825 
init_xcom_base()826 void	init_xcom_base()
827 {
828 	xcom_shutdown = 0;
829 	current_message = null_synode;
830 	executed_msg = null_synode;
831 	max_synode = null_synode;
832 	client_boot_done = 0;
833 	netboot_ok = 0;
834 	booting = 0;
835 	start_type = IDLE;
836 
837 	xcom_recover_init();
838 	my_id = new_id();
839 	push_site_def(NULL);
840 /*	update_servers(NULL); */
841 	xcom_cache_var_init();
842 	median_filter_init();
843 	link_init(&exec_wait, type_hash("task_env"));
844 	executor_site = 0;
845 	proposer_site = 0;
846 }
847 
init_tasks()848 static void	init_tasks()
849 {
850 	set_task(&boot, NULL);
851 	set_task(&net_boot, NULL);
852 	set_task(&net_recover, NULL);
853 	set_task(&killer, NULL);
854 	set_task(&executor, NULL);
855 	set_task(&retry, NULL);
856 	set_task(&detector, NULL);
857 	init_proposers();
858 	set_task(&alive_t, NULL);
859 	set_task(&sweeper, NULL);
860 }
861 
862 
863 /* Initialize the xcom thread */
xcom_thread_init()864 void	xcom_thread_init()
865 {
866 #ifndef NO_SIGPIPE
867 	signal(SIGPIPE, SIG_IGN);
868 #endif
869 	init_base_vars();
870 	init_site_vars();
871 	init_crc32c();
872 	my_srand48((long int)task_now());
873 
874 	init_xcom_base();
875 	init_tasks();
876 	init_cache();
877 
878 	/* Initialize input queue */
879 	channel_init(&prop_input_queue, type_hash("msg_link"));
880 	init_link_list();
881 	task_sys_init();
882 }
883 
884 
885 /* Empty the proposer input queue */
empty_prop_input_queue()886 static void	empty_prop_input_queue()
887 {
888 	empty_msg_channel(&prop_input_queue);
889 	MAY_DBG(FN; STRLIT("prop_input_queue empty"));
890 }
891 
892 
893 /* De-initialize the xcom thread */
xcom_thread_deinit()894 void	xcom_thread_deinit()
895 {
896 	DBGOUT(FN; STRLIT("Empty proposer input queue"));
897 	empty_prop_input_queue();
898 	DBGOUT(FN; STRLIT("Empty link free list"));
899 	empty_link_free_list();
900 	DBGOUT(FN; STRLIT("De-initialize cache"));
901 	deinit_cache();
902 	garbage_collect_servers();
903 }
904 
905 #define PROP_ITER int i;  for(i = 0; i < PROPOSERS; i++)
906 
907 static bool_t force_recover = FALSE;
908 /* purecov: begin deadcode */
must_force_recover()909 bool_t must_force_recover()
910 {
911 	return force_recover;
912 }
913 
914 
set_force_recover(bool_t const x)915 void	set_force_recover(bool_t const x)
916 {
917 	force_recover = x;
918 }
919 /* purecov: end */
920 
init_proposers()921 static void	init_proposers()
922 {
923 	PROP_ITER {
924 		set_task(&proposer[i], NULL);
925 	}
926 }
927 
928 
create_proposers()929 static void	create_proposers()
930 {
931 	PROP_ITER {
932 		set_task(&proposer[i], task_new(proposer_task, int_arg(i), "proposer_task", XCOM_THREAD_DEBUG));
933 	}
934 }
935 
936 
terminate_proposers()937 static void	terminate_proposers()
938 {
939 	PROP_ITER {
940 		task_terminate(proposer[i]);
941 	}
942 }
943 
free_forced_config_site_def()944 static void free_forced_config_site_def()
945 {
946   free_site_def(forced_config);
947   forced_config= NULL;
948 }
949 
950 #if TASK_DBUG_ON
951 static void	dbg_proposers() MY_ATTRIBUTE((unused));
dbg_proposers()952 static void	dbg_proposers()
953 {
954 	GET_GOUT;
955 	NDBG(PROPOSERS, d);
956 	{
957 		PROP_ITER {
958 			PPUT(proposer[i]);
959 		}
960 	}
961 	PRINT_GOUT;
962 	FREE_GOUT;
963 }
964 #endif
965 
set_proposer_startpoint()966 static void	set_proposer_startpoint()
967 {
968 	DBGOHK(FN; STRLIT("changing current message"));
969 	if (max_synode.msgno <= 1)
970 		set_current_message(first_free_synode(max_synode));
971 	else
972 		set_current_message(incr_msgno(first_free_synode(max_synode)));
973 }
974 
975 
976 
check_tasks()977 void	check_tasks()
978 {
979 }
980 
981 
982 /* }}} */
983 
984 /* {{{ Task functions */
985 /* purecov: begin deadcode */
986 /* Match any port */
yes(xcom_port port MY_ATTRIBUTE ((unused)))987 static int yes(xcom_port port MY_ATTRIBUTE((unused)))
988 {
989   return 1;
990 }
991 
992 /* Create tasks and enter the task main loop */
taskmain(xcom_port listen_port)993 int	taskmain(xcom_port listen_port)
994 {
995 	init_xcom_transport(listen_port);
996 	set_port_matcher(yes); /* For clients that use only addr, not addr:port  */
997 
998 	MAY_DBG(FN; STRLIT("enter taskmain"));
999 	ignoresig(SIGPIPE);
1000 
1001 	{
1002 		result	fd = {0,0};
1003 
1004 		if ((fd = announce_tcp(listen_port)).val < 0) {
1005 			MAY_DBG(FN; STRLIT("cannot annonunce tcp "); NDBG(listen_port, d));
1006 			task_dump_err(fd.funerr);
1007 			g_critical("Unable to announce tcp port %d. Port already in use?", listen_port);
1008 		}
1009 
1010 		MAY_DBG(FN; STRLIT("Creating tasks"));
1011 		task_new(generator_task, null_arg, "generator_task", XCOM_THREAD_DEBUG);
1012 		task_new(tcp_server, int_arg(fd.val), "tcp_server", XCOM_THREAD_DEBUG);
1013 		/* task_new(tcp_reaper_task, null_arg, "tcp_reaper_task", XCOM_THREAD_DEBUG); */
1014 		/* task_new(xcom_statistics, null_arg, "xcom_statistics", XCOM_THREAD_DEBUG); */
1015 		/* task_new(detector_task, null_arg, "detector_task", XCOM_THREAD_DEBUG); */
1016 		MAY_DBG(FN; STRLIT("XCOM is listening on "); NPUT(listen_port, d));
1017 	}
1018 
1019 	task_loop();
1020 
1021 	MAY_DBG(FN; STRLIT(" exit"));
1022 	return 1;
1023 }
1024 
1025 
start_run_tasks()1026 void	start_run_tasks()
1027 {
1028 	force_recover = 0;
1029 	client_boot_done = 1;
1030 	netboot_ok = 1;
1031 	booting = 0;
1032 	set_proposer_startpoint();
1033 	create_proposers();
1034 	set_task(&executor, task_new(executor_task, null_arg, "executor_task", XCOM_THREAD_DEBUG));
1035 	set_task(&sweeper, task_new(sweeper_task, null_arg, "sweeper_task", XCOM_THREAD_DEBUG));
1036 	set_task(&detector, task_new(detector_task, null_arg, "detector_task", XCOM_THREAD_DEBUG));
1037 	set_task(&alive_t, task_new(alive_task, null_arg, "alive_task", XCOM_THREAD_DEBUG));
1038 }
1039 
1040 /* Create tasks and enter the task main loop */
xcom_taskmain(xcom_port listen_port)1041 int	xcom_taskmain(xcom_port listen_port)
1042 {
1043 	init_xcom_transport(listen_port);
1044 
1045 	MAY_DBG(FN; STRLIT("enter taskmain"));
1046 	ignoresig(SIGPIPE);
1047 
1048 	{
1049 		result fd = {0,0};
1050 		if ((fd = announce_tcp(listen_port)).val < 0) {
1051 			MAY_DBG(FN; STRLIT("cannot annonunce tcp "); NDBG(listen_port, d));
1052 			task_dump_err(fd.funerr);
1053 			g_critical("Unable to announce tcp port %d. Port already in use?", listen_port);
1054 			pexitall(1);
1055 		}
1056 
1057 		MAY_DBG(FN; STRLIT("Creating tasks"));
1058 		/* task_new(generator_task, null_arg, "generator_task", XCOM_THREAD_DEBUG); */
1059 		task_new(tcp_server, int_arg(fd.val), "tcp_server", XCOM_THREAD_DEBUG);
1060 		task_new(tcp_reaper_task, null_arg, "tcp_reaper_task", XCOM_THREAD_DEBUG);
1061 		/* task_new(xcom_statistics, null_arg, "xcom_statistics", XCOM_THREAD_DEBUG); */
1062 		/* task_new(detector_task, null_arg, "detector_task", XCOM_THREAD_DEBUG); */
1063 		MAY_DBG(FN; STRLIT("XCOM is listening on "); NPUT(listen_port, d));
1064 	}
1065 
1066 	start_run_tasks();
1067 	task_loop();
1068 
1069 	MAY_DBG(FN; STRLIT(" exit"));
1070 	return 1;
1071 }
1072 /* purecov: end */
1073 static xcom_state_change_cb xcom_run_cb = 0;
1074 static xcom_state_change_cb xcom_terminate_cb = 0;
1075 static xcom_state_change_cb xcom_comms_cb = 0;
1076 static xcom_state_change_cb xcom_exit_cb = 0;
1077 static xcom_state_change_cb xcom_expel_cb = 0;
1078 
set_xcom_run_cb(xcom_state_change_cb x)1079 void set_xcom_run_cb(xcom_state_change_cb x)
1080 {
1081 	xcom_run_cb = x;
1082 }
1083 
set_xcom_comms_cb(xcom_state_change_cb x)1084 void set_xcom_comms_cb(xcom_state_change_cb x)
1085 {
1086   xcom_comms_cb = x;
1087 }
1088 /* purecov: begin deadcode */
set_xcom_terminate_cb(xcom_state_change_cb x)1089 void set_xcom_terminate_cb(xcom_state_change_cb x)
1090 {
1091 	xcom_terminate_cb = x;
1092 }
1093 /* purecov: end */
set_xcom_exit_cb(xcom_state_change_cb x)1094 void set_xcom_exit_cb(xcom_state_change_cb x)
1095 {
1096 	xcom_exit_cb = x;
1097 }
1098 
set_xcom_expel_cb(xcom_state_change_cb x)1099 void set_xcom_expel_cb(xcom_state_change_cb x)
1100 {
1101 	xcom_expel_cb = x;
1102 }
1103 
xcom_taskmain2(xcom_port listen_port)1104 int	xcom_taskmain2(xcom_port listen_port)
1105 {
1106   init_xcom_transport(listen_port);
1107 
1108 	MAY_DBG(FN; STRLIT("enter taskmain"));
1109 	ignoresig(SIGPIPE);
1110 
1111 	 {
1112 		result fd = {0,0};
1113 		if ((fd = announce_tcp(listen_port)).val < 0) {
1114 			MAY_DBG(FN; STRLIT("cannot annonunce tcp "); NDBG(listen_port, d));
1115 			task_dump_err(fd.funerr);
1116 			g_critical("Unable to announce tcp port %d. Port already in use?", listen_port);
1117 			if(xcom_comms_cb){
1118 				xcom_comms_cb(XCOM_COMMS_ERROR);
1119 			}
1120 			if(xcom_terminate_cb){
1121 				xcom_terminate_cb(0);
1122 			}
1123 			return 1;
1124 		}
1125 
1126 		if(xcom_comms_cb){
1127 			xcom_comms_cb(XCOM_COMMS_OK);
1128 		}
1129 
1130 		MAY_DBG(FN; STRLIT("Creating tasks"));
1131 		/* task_new(generator_task, null_arg, "generator_task", XCOM_THREAD_DEBUG); */
1132 		task_new(tcp_server, int_arg(fd.val), "tcp_server", XCOM_THREAD_DEBUG);
1133 		task_new(tcp_reaper_task, null_arg, "tcp_reaper_task", XCOM_THREAD_DEBUG);
1134 		/* task_new(xcom_statistics, null_arg, "xcom_statistics", XCOM_THREAD_DEBUG); */
1135 		/* task_new(detector_task, null_arg, "detector_task", XCOM_THREAD_DEBUG); */
1136 		MAY_DBG(FN; STRLIT("XCOM is listening on "); NPUT(listen_port, d));
1137 	}
1138 
1139 	task_loop();
1140 
1141 #if defined(XCOM_HAVE_OPENSSL)
1142 	xcom_cleanup_ssl();
1143 #endif
1144 
1145 	MAY_DBG(FN; STRLIT(" exit"));
1146 	xcom_thread_deinit();
1147 	return 1;
1148 }
1149 
1150 
1151 /* {{{ Paxos message construction and sending */
1152 
1153 /* Initialize a message for sending */
prepare(pax_msg * p,pax_op op)1154 static void	prepare(pax_msg *p, pax_op op)
1155 {
1156 	p->op = op;
1157 	p->reply_to = p->proposal;
1158 }
1159 
1160 
1161 /* Initialize a prepare_msg */
prepare_msg(pax_msg * p)1162 static int	prepare_msg(pax_msg *p)
1163 {
1164 	prepare(p, prepare_op);
1165 	/* p->msg_type = normal; */
1166 	return send_to_acceptors(p, "prepare_msg");
1167 }
1168 
1169 
1170 /* Initialize a noop_msg */
create_noop(pax_msg * p)1171 static pax_msg *create_noop(pax_msg *p)
1172 {
1173 	prepare(p, prepare_op);
1174 	p->msg_type = no_op;
1175 	return p;
1176 }
1177 
1178 
1179 /* Initialize a read_msg */
create_read(site_def const * site,pax_msg * p)1180 static pax_msg *create_read(site_def const * site, pax_msg *p)
1181 {
1182 	p->msg_type = normal;
1183 	p->proposal.node = get_nodeno(site);
1184 	prepare(p, read_op);
1185 	return p;
1186 }
1187 
1188 
skip_msg(pax_msg * p)1189 static int	skip_msg(pax_msg *p)
1190 {
1191 	prepare(p, skip_op);
1192 	MAY_DBG(FN; STRLIT("skipping message "); SYCEXP(p->synode));
1193 	p->msg_type = no_op;
1194 	return send_to_all(p, "skip_msg");
1195 }
1196 
1197 
brand_app_data(pax_msg * p)1198 static void	brand_app_data(pax_msg *p)
1199 {
1200 	if (p->a) {
1201 		p->a->app_key.msgno = p->synode.msgno;
1202 		p->a->app_key.node = p->synode.node;
1203 		p->a->app_key.group_id = p->a->group_id = p->synode.group_id;
1204 	}
1205 }
1206 
1207 
my_unique_id(synode_no synode)1208 static synode_no my_unique_id(synode_no synode)
1209 {
1210 	assert(my_id != 0);
1211 /* Random number derived from node number and timestamp which uniquely defines this instance */
1212 	synode.group_id = my_id;
1213 	return synode;
1214 }
1215 
1216 
set_unique_id(pax_msg * msg,synode_no synode)1217 static void	set_unique_id(pax_msg *msg, synode_no synode )
1218 {
1219 	app_data_ptr a = msg->a;
1220 	while (a) {
1221 		a->unique_id = synode;
1222 		a = a->next;
1223 	}
1224 }
1225 
1226 
propose_msg(pax_msg * p)1227 static int	propose_msg(pax_msg *p)
1228 {
1229 	p->op = accept_op;
1230 	p->reply_to = p->proposal;
1231 	brand_app_data(p);
1232 	/* set_unique_id(p, my_unique_id(synode)); */
1233 	return send_to_acceptors(p, "propose_msg");
1234 }
1235 
1236 
set_learn_type(pax_msg * p)1237 static void	set_learn_type(pax_msg *p)
1238 {
1239 	p->op = learn_op;
1240 	p->msg_type = p->a ? normal : no_op;
1241 }
1242 
1243 /* purecov: begin deadcode */
learn_msg(site_def const * site,pax_msg * p)1244 static int	learn_msg(site_def const * site, pax_msg *p)
1245 {
1246 	set_learn_type(p);
1247 	p->reply_to = p->proposal;
1248 	brand_app_data(p);
1249 	MAY_DBG(FN;
1250 	    dbg_bitset(p->receivers, get_maxnodes(site));
1251 	    );
1252 	return send_to_all_site(site, p, "learn_msg");
1253 }
1254 /* purecov: end */
tiny_learn_msg(site_def const * site,pax_msg * p)1255 static int	tiny_learn_msg(site_def const *site, pax_msg *p)
1256 {
1257 	int	retval;
1258 	pax_msg * tmp = clone_pax_msg_no_app(p);
1259 	pax_machine * pm = get_cache(p->synode);
1260 
1261 	ref_msg(tmp);
1262 	tmp->msg_type = p->a ? normal : no_op;
1263 	tmp->op = tiny_learn_op;
1264 	tmp->reply_to = pm->proposer.bal;
1265 	brand_app_data(tmp);
1266 	MAY_DBG(FN;
1267 	    dbg_bitset(tmp->receivers, get_maxnodes(site));
1268 	    );
1269 	retval = send_to_all_site(site, tmp, "tiny_learn_msg");
1270 	unref_msg(&tmp);
1271 	return retval;
1272 }
1273 
1274 
1275 /* }}} */
1276 
1277 /* {{{ Proposer task */
1278 
prepare_push_3p(site_def const * site,pax_machine * p,pax_msg * msg,synode_no msgno)1279 static void	prepare_push_3p(site_def const * site, pax_machine *p, pax_msg *msg, synode_no msgno)
1280 {
1281 	MAY_DBG(FN;
1282 	    SYCEXP(msgno);
1283 	    NDBG(p->proposer.bal.cnt, d); NDBG(p->acceptor.promise.cnt, d));
1284 	p->proposer.bal.node = get_nodeno(site);
1285 	 {
1286 		int	maxcnt = MAX(p->proposer.bal.cnt, p->acceptor.promise.cnt);
1287 		p->proposer.bal.cnt = ++maxcnt;
1288 	}
1289 	msg->synode = msgno;
1290 	msg->proposal = p->proposer.bal;
1291 }
1292 
1293 
push_msg_2p(site_def const * site,pax_machine * p)1294 static void	push_msg_2p(site_def const * site, pax_machine *p)
1295 {
1296 	assert(p->proposer.msg);
1297 
1298 	BIT_ZERO(p->proposer.prop_nodeset);
1299 	MAY_DBG(FN; SYCEXP(p->synode));
1300 	p->proposer.bal.cnt = 0;
1301 	p->proposer.bal.node = get_nodeno(site);
1302 	p->proposer.msg->proposal = p->proposer.bal;
1303 	p->proposer.msg->synode = p->synode;
1304 	p->proposer.msg->force_delivery = p->force_delivery;
1305 	propose_msg(p->proposer.msg);
1306 }
1307 
1308 
push_msg_3p(site_def const * site,pax_machine * p,pax_msg * msg,synode_no msgno,pax_msg_type msg_type)1309 static void	push_msg_3p(site_def const * site, pax_machine *p, pax_msg *msg, synode_no msgno, pax_msg_type msg_type)
1310 {
1311 	assert(msgno.msgno != 0);
1312 	prepare_push_3p(site, p, msg, msgno);
1313 	msg->msg_type = msg_type;
1314 	BIT_ZERO(p->proposer.prep_nodeset);
1315 	assert(p->proposer.msg);
1316 	msg->force_delivery = p->force_delivery;
1317 	prepare_msg(msg);
1318 	MAY_DBG(FN; BALCEXP(msg->proposal);
1319 	    SYCEXP(msgno); STRLIT(" op "); STRLIT(pax_op_to_str(msg->op)));
1320 }
1321 
1322 
1323 /* Brand client message with unique ID */
brand_client_msg(pax_msg * msg,synode_no msgno)1324 static void	brand_client_msg(pax_msg *msg, synode_no msgno)
1325 {
1326 	assert(!synode_eq(msgno, null_synode));
1327 	set_unique_id(msg, my_unique_id(msgno));
1328 }
1329 
1330 /* purecov: begin deadcode */
reject_send(site_def const * site,recover_action r)1331 static int	reject_send(site_def const * site, recover_action r)
1332 {
1333 	return r != rec_send && xcom_booted() && (!site || !enough_live_nodes(site));
1334 }
1335 /* purecov: end */
1336 
xcom_send(app_data_ptr a,pax_msg * msg)1337 void	xcom_send(app_data_ptr a, pax_msg *msg)
1338 {
1339 	MAY_DBG(FN; PTREXP(a); SYCEXP(a->app_key); SYCEXP(msg->synode));
1340 	msg->a = a;
1341 	msg->op = client_msg;
1342 	 {
1343 		msg_link * link = msg_link_new(msg, VOID_NODE_NO);
1344 		MAY_DBG(FN; COPY_AND_FREE_GOUT(dbg_pax_msg(msg)));
1345 		channel_put(&prop_input_queue, &link->l);
1346 	}
1347 }
1348 
1349 /* purecov: begin deadcode */
generator_task(task_arg arg MY_ATTRIBUTE ((unused)))1350 static int	generator_task(task_arg arg MY_ATTRIBUTE((unused)))
1351 {
1352 	DECL_ENV
1353 	    int	dummy;
1354 	END_ENV;
1355 
1356 	TASK_BEGIN
1357 	    MAY_DBG(FN; );
1358 	check_tasks(); /* Start tasks which should be running */
1359 	for(;;) {
1360 		app_data_ptr a = 0;
1361 		while (a) {
1362 			assert(!(a->chosen && synode_eq(a->app_key, null_synode)));
1363 			MAY_DBG(FN; PTREXP(a); SYCEXP(a->app_key));
1364 			MAY_DBG(FN;
1365 			    COPY_AND_FREE_GOUT(dbg_app_data(a));
1366 			    );
1367 			if (a->body.c_t == exit_type) {
1368 				bury_site(get_group_id(get_site_def()));
1369 				copy_app_data(&a, NULL);
1370 				task_terminate_all();    /* Kill, kill, kill, kill, kill, kill. This is the end. */
1371 
1372 				init_xcom_base();             /* Reset shared variables */
1373 				init_tasks();            /* Reset task variables */
1374 				free_site_defs();
1375 				free_forced_config_site_def();
1376 				garbage_collect_servers();
1377 				DBGOUT(FN; STRLIT("shutting down"));
1378 				xcom_shutdown = 1;
1379 				TERMINATE;
1380 			} else if (a->body.c_t == reset_type || a->body.c_t == remove_reset_type) {
1381 				if(a->body.c_t == reset_type) /* Not for remove node */
1382 					bury_site(get_group_id(get_site_def()));
1383 				copy_app_data(&a, NULL);
1384 				init_xcom_base();             /* Reset shared variables */
1385 				check_tasks(); /* Stop tasks which should not be running */
1386 				free_site_defs();
1387 				free_forced_config_site_def();
1388 				garbage_collect_servers();
1389 			} else {
1390 				if (reject_send(get_site_def(), a->recover)) {
1391 					copy_app_data(&a, NULL);
1392 				} else {
1393 					pax_msg * msg = pax_msg_new(null_synode, get_site_def());
1394 					if (is_real_recover(a)) {
1395 						msg->start_type = RECOVER;
1396 						if (force_recover) {
1397 							/* We are desperate to recover,
1398 							   fake an accepted message with null key */
1399 							DBGOUT(FN; STRLIT("forcing recovery "));
1400 							a->chosen = TRUE;
1401 						}
1402 					}
1403 					xcom_send(a, msg);
1404 				}
1405 			}
1406 		}
1407 
1408 		TASK_DELAY(0.1);
1409 	}
1410 	FINALLY
1411 	    TASK_END;
1412 }
1413 /* purecov: end */
1414 
1415 #define FNVSTART 0x811c9dc5
1416 
1417 /* Fowler-Noll-Vo type multiplicative hash */
fnv_hash(unsigned char * buf,size_t length,uint32_t sum)1418 static uint32_t fnv_hash(unsigned char *buf, size_t length, uint32_t sum)
1419 {
1420   size_t i = 0;
1421   for (i = 0; i < length; i++) {
1422     sum = sum * (uint32_t)0x01000193 ^ (uint32_t)buf[i];
1423   }
1424   return sum;
1425 }
1426 
1427 /**
1428    Create a new (hopefully unique) ID. The basic idea is to create a hash from
1429    the host ID and a timestamp.
1430 */
new_id()1431 uint32_t new_id()
1432 {
1433 	long	id = get_unique_long();
1434 	double	timestamp = task_now();
1435 	uint32_t retval = 0;
1436 	while (retval == 0 ||
1437 	    is_dead_site(retval)) { /* Avoid returning 0 or already used site id */
1438 		retval = fnv_hash((unsigned char *) & id, sizeof(id), 0);
1439 		retval = fnv_hash((unsigned char *) & timestamp, sizeof(timestamp), retval);
1440 	}
1441 	return retval;
1442 }
1443 
getstart(app_data_ptr a)1444 static synode_no getstart(app_data_ptr a)
1445 {
1446 	synode_no retval = null_synode;
1447 	G_DEBUG("getstart group_id %x", a->group_id);
1448 	if (!a || a->group_id == null_id) {
1449 		retval.group_id = new_id();
1450 	} else {
1451 		a->app_key.group_id = a->group_id;
1452 		retval = a->app_key;
1453 		if (get_site_def() && retval.msgno != 1) {
1454 			/* Not valid until after event horizon has been passed */
1455 			synode_set_to_event_horizon(&retval);
1456 		}
1457 	}
1458 	return retval;
1459 }
1460 
site_install_action(site_def * site,cargo_type operation)1461 void site_install_action(site_def *site, cargo_type operation)
1462 {
1463 	DBGOUT(FN; NDBG(get_nodeno(get_site_def()), u));
1464 	if (synode_gt(site->start, max_synode))
1465 		set_max_synode(site->start);
1466 	site->nodeno = xcom_find_node_index(&site->nodes);
1467 	push_site_def(site);
1468 	DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_site_def(site)));
1469 	set_group(get_group_id(site));
1470 	if(get_maxnodes(get_site_def())){
1471 		update_servers(site, operation);
1472 	}
1473 	site->install_time = task_now();
1474 	DBGOUT(FN; SYCEXP(site->start); SYCEXP(site->boot_key));
1475 	DBGOUT(FN; NDBG(get_nodeno(site), u));
1476 	DBGOUT(SYCEXP(site->start); SYCEXP(site->boot_key); NDBG(site->install_time,f));
1477 	DBGOUT(NDBG(get_nodeno(site), u));
1478 }
1479 
create_site_def_with_start(app_data_ptr a,synode_no start)1480 static site_def *create_site_def_with_start(app_data_ptr a, synode_no start)
1481 {
1482 	site_def * site = new_site_def();
1483 	MAY_DBG(FN; COPY_AND_FREE_GOUT(dbg_list(&a->body.app_u_u.nodes)); );
1484 	init_site_def(a->body.app_u_u.nodes.node_list_len,
1485 				  a->body.app_u_u.nodes.node_list_val, site);
1486 	site->start = start;
1487 	site->boot_key = a->app_key;
1488 	return site;
1489 }
1490 
1491 
install_ng_with_start(app_data_ptr a,synode_no start)1492 static site_def * install_ng_with_start(app_data_ptr a, synode_no start)
1493 {
1494 	if (a) {
1495 		site_def *site = create_site_def_with_start(a, start);
1496 		site_install_action(site, a->body.c_t);
1497 		return site;
1498 	}
1499 	return 0;
1500 }
1501 
1502 
install_node_group(app_data_ptr a)1503 site_def *install_node_group(app_data_ptr a)
1504 {
1505 	ADD_EVENTS(
1506 	    add_event(string_arg("a->app_key"));
1507 	    add_synode_event(a->app_key);
1508 	    );
1509 	if (a)
1510 		return install_ng_with_start(a, getstart(a));
1511 		else
1512 		return 0;
1513 }
1514 
1515 /* purecov: begin deadcode */
is_real_recover(app_data_ptr a)1516 int	is_real_recover(app_data_ptr a)
1517 {
1518 	return  a && a->body.c_t == xcom_recover && a->body.app_u_u.rep.msg_list.synode_no_array_len > 0;
1519 }
1520 /* purecov: end */
1521 
set_max_synode(synode_no synode)1522 void	set_max_synode(synode_no synode)
1523 {
1524 	max_synode = synode; /* Track max synode number */
1525 	MAY_DBG(FN; STRLIT("new "); SYCEXP(max_synode));
1526 }
1527 
1528 /* purecov: begin deadcode */
learn_accepted_value(site_def const * site,pax_msg * p,synode_no synode)1529 static void	learn_accepted_value(site_def const * site, pax_msg *p, synode_no synode)
1530 {
1531 	pax_msg * msg = pax_msg_new(synode, site);
1532 	ref_msg(msg);
1533 	copy_app_data(&msg->a, p->a);
1534 	msg->start_type = p->start_type;
1535 	set_learn_type(msg);
1536 	MAY_DBG(FN; STRLIT("trying to learn known value "); SYCEXP(synode));
1537 	send_to_all_site(site, msg, "learn_accepted_value");
1538 	unref_msg(&msg);
1539 }
1540 /* purecov: end */
1541 
is_busy(synode_no s)1542 static int	is_busy(synode_no s)
1543 {
1544 	pax_machine * p = hash_get(s);
1545 	if (!p) {
1546 		return 0;
1547 	} else {
1548 		return started(p);
1549 	}
1550 }
1551 
1552 #if 0
1553 static synode_no find_slot(synode_no msgno, site_def **site)
1554 {
1555 	assert(!synode_eq(msgno, null_synode));
1556 	while (is_busy(msgno)) {
1557 		msgno = incr_msgno(msgno);
1558 	}
1559 	assert(!synode_eq(msgno, null_synode));
1560 	*site = find_site_def_rw(msgno);
1561 	return msgno;
1562 }
1563 #endif
1564 
match_my_msg(pax_msg * learned,pax_msg * mine)1565 bool_t match_my_msg(pax_msg *learned, pax_msg *mine)
1566 {
1567 	MAY_DBG(FN; PTREXP(learned->a);
1568 	    if (learned->a)
1569 	    SYCEXP(learned->a->unique_id);
1570 	    PTREXP(mine->a);
1571 	    if (mine->a)
1572 	    SYCEXP(mine->a->unique_id);
1573 	    )		;
1574 	if (learned->a && mine->a) { /* Both have app data, see if data is mine */
1575 		return synode_eq(learned->a->unique_id, mine->a->unique_id);
1576 	} else if (!(learned->a || mine->a)) { /* None have app data, anything goes */
1577 		return TRUE;
1578 	} else { /* Definitely mismatch */
1579 		return FALSE;
1580 	}
1581 }
1582 
1583 
1584 #if TASK_DBUG_ON
dbg_reply_set(site_def const * site,const char * s,bit_set * bs)1585 static void	dbg_reply_set(site_def const * site, const char *s, bit_set *bs)
1586 {
1587 	unsigned int	i = 0;
1588 	unsigned int	n = get_maxnodes(site);
1589 	GET_GOUT;
1590 	STRLIT(s);
1591 	for (i = 0; i <  n && i < bs->bits.bits_len * sizeof(*bs->bits.bits_val) * BITS_PER_BYTE; i++) {
1592 		NPUT(BIT_ISSET(i, bs), d);
1593 	}
1594 	PRINT_GOUT;
1595 	FREE_GOUT;
1596 }
1597 #endif
1598 
1599 
1600 static void	propose_noop(synode_no find, pax_machine *p);
1601 
too_far(synode_no s)1602 static inline int too_far(synode_no s)
1603 {
1604 	return s.msgno >= executed_msg.msgno + event_horizon;
1605 }
1606 
1607 #define GOTO(x) {DBGOUT(STRLIT("goto "); STRLIT(#x)); goto x; }
1608 
is_view(cargo_type x)1609 static inline int is_view(cargo_type x)
1610 {
1611 	return x == view_msg;
1612 }
1613 
is_config(cargo_type x)1614 static inline int is_config(cargo_type x)
1615 {
1616 	return x == unified_boot_type ||
1617 	    x == add_node_type ||
1618 	    x == remove_node_type ||
1619 	    x == force_config_type;
1620 }
1621 
1622 static void terminate_and_exit();
1623 
1624 /* Send messages by fetching from the input queue and trying to get it accepted
1625    by a Paxos instance */
proposer_task(task_arg arg)1626 static int	proposer_task(task_arg arg)
1627 {
1628 	DECL_ENV
1629 	    int	self;       /* ID of this proposer task */
1630 	pax_machine * p; /* Pointer to Paxos instance */
1631 	msg_link * client_msg; /* The client message we are trying to push */
1632 	synode_no msgno;
1633 	pax_msg * prepare_msg;
1634 	double	start_propose;
1635 	double	start_push;
1636 	double	delay;
1637 	site_def const *site;
1638 	size_t size;
1639 	END_ENV;
1640 
1641 	TASK_BEGIN
1642 
1643 	  ep->self = get_int_arg(arg);
1644 	ep->p = NULL;
1645 	ep->client_msg = NULL;
1646 	ep->prepare_msg = NULL;
1647 	ep->start_propose = 0.0;
1648 	ep->start_push = 0.0;
1649 	ep->delay = 0.0;
1650 	ep->msgno = current_message;
1651 	ep->site = 0;
1652 	ep->size = 0;
1653 
1654 	MAY_DBG(FN; NDBG(ep->self, d); NDBG(task_now(), f));
1655 
1656 	while (!xcom_shutdown) { /* Loop until no more work to do */
1657 		int	MY_ATTRIBUTE((unused)) lock = 0;
1658 		/* Wait for client message */
1659 		assert(!ep->client_msg);
1660 		CHANNEL_GET(&prop_input_queue, &ep->client_msg, msg_link);
1661 		MAY_DBG(FN; PTREXP(ep->client_msg->p->a); STRLIT("extracted "); SYCEXP(ep->client_msg->p->a->app_key));
1662 
1663 		/* Grab rest of messages in queue as well, but never batch config messages, which need a unique number */
1664 
1665 		if(!is_config(ep->client_msg->p->a->body.c_t) && !is_view(ep->client_msg->p->a->body.c_t)){
1666 			ep->size = app_data_size(ep->client_msg->p->a);
1667 		    while(AUTOBATCH && ep->size <= MAX_BATCH_SIZE &&
1668 		    	! link_empty(&prop_input_queue.data)){  /* Batch payloads into single message */
1669 		    	msg_link *tmp;
1670 		    	app_data_ptr atmp;
1671 
1672 				CHANNEL_GET(&prop_input_queue, &tmp, msg_link);
1673 				atmp = tmp->p->a;
1674 				ep->size += app_data_size(atmp);
1675 				/* Abort batching if config or too big batch */
1676 				if(is_config(atmp->body.c_t) || is_view(atmp->body.c_t) ||
1677 				ep->size > MAX_BATCH_SIZE){
1678 					channel_put_front(&prop_input_queue, &tmp->l);
1679 					break;
1680 				}
1681 		    	ADD_T_EV(seconds(),__FILE__, __LINE__, "batching");
1682 
1683 				tmp->p->a = 0; 		/* Steal this payload */
1684 				msg_link_delete(&tmp);  /* Get rid of the empty message */
1685 				atmp->next = ep->client_msg->p->a; /* Add to list of app_data */
1686 				G_TRACE("Batching %s %s", cargo_type_to_str(ep->client_msg->p->a->body.c_t),
1687 					cargo_type_to_str(atmp->body.c_t));
1688 				ep->client_msg->p->a = atmp;
1689 				MAY_DBG(FN; PTREXP(ep->client_msg->p->a); STRLIT("extracted "); SYCEXP(ep->client_msg->p->a->app_key));
1690 		    }
1691 	    }
1692 		ep->start_propose = task_now();
1693 		ep->delay = 0.0;
1694 
1695 
1696 		assert(!(AUTOBATCH && ep->client_msg->p->a->chosen));
1697 
1698 		/* See if value is known already (old message) */
1699 		if (ep->client_msg->p->a->chosen) {
1700 			DBGOUT(FN; PTREXP(ep->client_msg->p->a); STRLIT("pushing old "); SYCEXP(ep->client_msg->p->a->app_key));
1701 			MAY_DBG(FN;
1702 			    COPY_AND_FREE_GOUT(dbg_pax_msg(ep->client_msg->p));
1703 			    );
1704 			ep->msgno = ep->client_msg->p->a->app_key;
1705 			ep->site = find_site_def(ep->msgno);
1706 			if(!ep->site) /* Use current site if message is too old */
1707 				ep->site = get_site_def();
1708 
1709 			/* See if we can do anything with this message */
1710 #if 0
1711 	    	if (!ep->site || get_nodeno(ep->site) == VOID_NODE_NO) {
1712 				/* Give up */
1713 				deliver_to_app(NULL, ep->client_msg->p->a, delivery_failure);
1714 				GOTO(next);
1715 			}
1716 #endif
1717 retry_old:
1718 			ep->p = get_cache(ep->msgno);
1719 			assert(ep->p);
1720 			lock = lock_pax_machine(ep->p);
1721 			assert(!lock);
1722 
1723 			/* Try to get a value accepted */
1724 			learn_accepted_value(ep->site, ep->client_msg->p, ep->msgno);
1725 			while (!finished(ep->p)) {
1726 				/* Sleep here if value is not already chosen */
1727 				TIMED_TASK_WAIT(&ep->p->rv, ep->delay = wakeup_delay(ep->delay));
1728 				if (!synode_eq(ep->msgno, ep->p->synode)) {
1729 					DBGOUT(FN; STRLIT("proposer_task detected stolen state machine, retry"); );
1730 					/* unlock_pax_machine(ep->p); */
1731 					GOTO(retry_old);
1732 				}
1733 				assert(synode_eq(ep->msgno, ep->p->synode));
1734 				learn_accepted_value(ep->site, ep->client_msg->p, ep->msgno);
1735 			}
1736 			unlock_pax_machine(ep->p);
1737 			msg_link_delete(&ep->client_msg);
1738 			continue;
1739 		}
1740 
1741 		/* It is a new message */
1742 
1743 		assert(!synode_eq(current_message, null_synode));
1744 
1745 retry_new:
1746 		/* Find a free slot */
1747 
1748 		assert(!synode_eq(current_message, null_synode));
1749 		ep->msgno = current_message;
1750 		while (is_busy(ep->msgno)) {
1751 			while (/* ! ep->client_msg->p->force_delivery &&  */ too_far(incr_msgno(ep->msgno))) { /* Too far ahead of executor */
1752 				TIMED_TASK_WAIT(&exec_wait, 1.0);
1753 				DBGOUT(FN; TIMECEXP(ep->start_propose); TIMECEXP(ep->client_msg->p->a->expiry_time); TIMECEXP(task_now());
1754 
1755 				    NDBG(enough_live_nodes(ep->site), d));
1756 			}
1757 			ep->msgno = incr_msgno(ep->msgno);
1758 		}
1759 		assert(!synode_eq(ep->msgno, null_synode));
1760 		proposer_site = find_site_def_rw(ep->msgno);
1761 
1762 		ep->site = proposer_site;
1763 
1764 		/* See if we can do anything with this message */
1765 #if 1
1766     	if (!ep->site || get_nodeno(ep->site) == VOID_NODE_NO) {
1767 			/* Give up */
1768 			deliver_to_app(NULL, ep->client_msg->p->a, delivery_failure);
1769 			GOTO(next);
1770 		}
1771 #endif
1772 		DBGOHK(FN; STRLIT("changing current message"));
1773 		set_current_message(ep->msgno);
1774 
1775 		brand_client_msg(ep->client_msg->p, ep->msgno);
1776 		ep->client_msg->p->a->lsn = ep->msgno.msgno;
1777 
1778 		for(;;) { /* Loop until the client message has been learned */
1779 			/* Get a Paxos instance to send the client message */
1780 			ep->p = get_cache(ep->msgno);
1781 			assert(ep->p);
1782 			if(ep->client_msg->p->force_delivery)
1783 				ep->p->force_delivery = ep->client_msg->p->force_delivery;
1784 			lock = lock_pax_machine(ep->p);
1785 			assert(!lock);
1786 
1787 			/* Set the client message as current proposal */
1788 			assert(ep->client_msg->p);
1789 			replace_pax_msg(&ep->p->proposer.msg, clone_pax_msg(ep->client_msg->p));
1790       if (ep->p->proposer.msg == NULL) {
1791         g_critical("Node %u has run out of memory while sending a message and "
1792                    "will now exit.",
1793                    get_nodeno(proposer_site));
1794         terminate_and_exit();
1795         TERMINATE;
1796       }
1797 			assert(ep->p->proposer.msg);
1798 			PAX_MSG_SANITY_CHECK(ep->p->proposer.msg);
1799 
1800 			/* Create the prepare message */
1801 			unchecked_replace_pax_msg(&ep->prepare_msg,
1802 				pax_msg_new(ep->msgno, ep->site));
1803 			DBGOUT(FN; PTREXP(ep->client_msg->p->a); STRLIT("pushing "); SYCEXP(ep->msgno));
1804 			MAY_DBG(FN; COPY_AND_FREE_GOUT(dbg_app_data(ep->prepare_msg->a)));
1805 
1806 			if(threephase || ep->p->force_delivery){
1807 				push_msg_3p(ep->site, ep->p, ep->prepare_msg, ep->msgno, normal);
1808 			}else{
1809 				push_msg_2p(ep->site, ep->p);
1810 			}
1811 
1812 			ep->start_push = task_now();
1813 
1814 			while (!finished(ep->p)) { /* Try to get a value accepted */
1815 				/* We will wake up periodically, and whenever a message arrives */
1816 				TIMED_TASK_WAIT(&ep->p->rv, ep->delay = wakeup_delay(ep->delay));
1817 				if (!synode_eq(ep->msgno, ep->p->synode) || ep->p->proposer.msg == NULL) {
1818 					DBGOHK(FN; STRLIT("detected stolen state machine, retry"); );
1819 					/* unlock_pax_machine(ep->p); */
1820 					GOTO(retry_new); /* Need to break out of both loops,
1821 										and we have no "exit named loop" construction */
1822 				}
1823 				assert(synode_eq(ep->msgno, ep->p->synode) &&  ep->p->proposer.msg);
1824 				if (finished(ep->p))
1825 					break;
1826 				 {
1827 					 double	now = task_now();
1828 					if ((ep->start_push + ep->delay) <= now) {
1829 						PAX_MSG_SANITY_CHECK(ep->p->proposer.msg);
1830 						DBGOUT(FN; STRLIT("retry pushing "); SYCEXP(ep->msgno));
1831 						MAY_DBG(FN; COPY_AND_FREE_GOUT(dbg_app_data(ep->prepare_msg->a));
1832 						    );
1833 						DBGOUT(BALCEXP(ep->p->proposer.bal);
1834 						    BALCEXP(ep->p->acceptor.promise));
1835 						MAY_DBG(FN; dbg_reply_set(ep->site, "prep_node_set", ep->p->proposer.prep_nodeset);
1836 						    dbg_reply_set(ep->site, "prop_node_set", ep->p->proposer.prop_nodeset);
1837 						    );
1838 						push_msg_3p(ep->site, ep->p, ep->prepare_msg, ep->msgno, normal);
1839 						ep->start_push = now;
1840 					}
1841 				}
1842 			}
1843 			/* When we get here, we know the value for this message number,
1844 			   but it may not be the value we tried to push,
1845 			   so loop until we have a successfull push. */
1846 			unlock_pax_machine(ep->p);
1847 			MAY_DBG(FN; STRLIT(" found finished message ");
1848 			    SYCEXP(ep->msgno);
1849 			    STRLIT("seconds since last push ");
1850 			    NPUT(task_now() - ep->start_push, f);
1851 			    STRLIT("ep->client_msg ");
1852 			    COPY_AND_FREE_GOUT(dbg_pax_msg(ep->client_msg->p));
1853 			    );
1854 			MAY_DBG(FN; STRLIT("ep->p->learner.msg ");
1855 			    COPY_AND_FREE_GOUT(dbg_pax_msg(ep->p->learner.msg));
1856 			    );
1857 			if (match_my_msg(ep->p->learner.msg, ep->client_msg->p)){
1858 				break;
1859 			} else
1860 				GOTO(retry_new);
1861 		}
1862 next:
1863 		 {
1864 			double	now = task_now();
1865 			double	used = now -ep->start_propose;
1866 			add_to_filter(used);
1867 			DBGOUT(FN; STRLIT("completed ep->msgno ");
1868 			    SYCEXP(ep->msgno); NDBG(used, f); NDBG(median_time(), f);
1869 			    STRLIT("seconds since last push "); NDBG(now - ep->start_push, f); );
1870 			MAY_DBG(FN; STRLIT("ep->client_msg ");
1871 			    COPY_AND_FREE_GOUT(dbg_pax_msg(ep->client_msg->p));
1872 			    );
1873 			if (ep->p) {
1874 				MAY_DBG(FN; STRLIT("ep->p->learner.msg ");
1875 						COPY_AND_FREE_GOUT(dbg_pax_msg(ep->p->learner.msg));
1876 				    );
1877 			}
1878 			msg_link_delete(&ep->client_msg);
1879 		}
1880 	}
1881 	FINALLY
1882 	    MAY_DBG(FN; STRLIT("exit "); NDBG(ep->self, d); NDBG(task_now(), f));
1883 	if (ep->p)
1884 		unlock_pax_machine(ep->p);
1885 	replace_pax_msg(&ep->prepare_msg, NULL);
1886 	if (ep->client_msg) { /* If we get here with a client message, we have failed to deliver */
1887 		deliver_to_app(ep->p, ep->client_msg->p->a, delivery_failure);
1888 		msg_link_delete(&ep->client_msg);
1889 
1890 	}
1891 	TASK_END;
1892 }
1893 
1894 
1895 /* }}} */
1896 
1897 
1898 /* {{{ Executor task */
1899 
leader(site_def const * s)1900 static node_no	leader(site_def const *s)
1901 {
1902 	node_no	leader = 0;
1903 	for (leader = 0; leader < get_maxnodes(s); leader++) {
1904 		if (!may_be_dead(s->detected, leader, task_now()))
1905 			return leader;
1906 	}
1907 	return 0;
1908 }
1909 
1910 
iamthegreatest(site_def const * s)1911 int	iamthegreatest(site_def const *s)
1912 {
1913 	return leader(s) == s->nodeno;
1914 }
1915 
1916 
execute_msg(site_def const * site,pax_machine * pma,pax_msg * p)1917 void	execute_msg(site_def const * site, pax_machine *pma, pax_msg *p)
1918 {
1919 	app_data_ptr a = p->a;
1920 	DBGOUT(FN;
1921 	    COPY_AND_FREE_GOUT(dbg_pax_msg(p));
1922 	    );
1923 	if (a) {
1924 		switch (p->a->body.c_t) {
1925 		case unified_boot_type:
1926 	    case add_node_type:
1927 	    case remove_node_type:
1928       case force_config_type:
1929 
1930 			check_tasks();
1931 			break;
1932 		case xcom_recover:
1933 /* purecov: begin deadcode */
1934 			break;
1935 /* purecov: end */
1936 		case app_type:
1937 			MAY_DBG(FN; STRLIT(" learner.msg ");
1938 			    COPY_AND_FREE_GOUT(dbg_pax_msg(pma->learner.msg));
1939 			    );
1940 			deliver_to_app(pma, a, delivery_ok);
1941 			break;
1942 		case view_msg:
1943 			MAY_DBG(FN;
1944 			    STRLIT(" learner.msg ");
1945 			    COPY_AND_FREE_GOUT(dbg_pax_msg(pma->learner.msg)); );
1946 			if(site && site->global_node_set.node_set_len == a->body.app_u_u.present.node_set_len){
1947  				assert(site->global_node_set.node_set_len == a->body.app_u_u.present.node_set_len);
1948 				copy_node_set(&a->body.app_u_u.present, &(((site_def *)site)->global_node_set));
1949 				deliver_global_view_msg(site, p->synode);
1950 			}
1951 			break;
1952 #ifdef USE_EXIT_TYPE
1953 		case exit_type:
1954 			g_critical("Unable to get message, process will now exit. Please ensure that the process is restarted");
1955 			exit(1);
1956 			break;
1957 #endif
1958 		default:
1959 			break;
1960 		}
1961 	}
1962 	MAY_DBG(FN; SYCEXP(p->synode));
1963 }
1964 
1965 
1966 
1967 static void	read_missing_values(int n);
1968 static void	propose_missing_values(int n);
1969 
find_value(site_def const * site,unsigned int * wait,int n)1970 static void	find_value(site_def const *site, unsigned int *wait, int n)
1971 {
1972 	DBGOHK(FN; NDBG(*wait, d));
1973 
1974 	if(get_nodeno(site) == VOID_NODE_NO){
1975 		read_missing_values(n);
1976 		return;
1977 	}
1978 
1979 	switch (*wait) {
1980 	case 0:
1981 	case 1:
1982 		read_missing_values(n);
1983 		(*wait)++;
1984 		break;
1985 	case 2:
1986 		if (iamthegreatest(site))
1987 			propose_missing_values(n);
1988 		else
1989 			read_missing_values(n);
1990 		(*wait)++;
1991 		break;
1992 	case 3:
1993 		propose_missing_values(n);
1994 		break;
1995 	default:
1996 		break;
1997 	}
1998 }
1999 
get_xcom_message(pax_machine ** p,synode_no msgno,int n)2000 int	get_xcom_message(pax_machine **p, synode_no msgno, int n)
2001 {
2002 	DECL_ENV
2003 	    unsigned int	wait;
2004 	double	delay;
2005 	END_ENV;
2006 
2007 	TASK_BEGIN
2008 
2009 	    ep->wait = 0;
2010 	ep->delay = 0.0;
2011 	*p = get_cache(msgno);
2012 
2013 	while (!finished(*p)) {
2014 		site_def const * site = find_site_def(msgno);
2015 		DBGOHK(FN;
2016 		    STRLIT(" not finished ");
2017 		    SYCEXP(msgno); PTREXP(*p);
2018 		    NDBG(ep->wait, u);
2019 		    SYCEXP(msgno));
2020 		if (get_maxnodes(site) > 1 && iamthegreatest(site) &&
2021 		    site->global_node_set.node_set_val &&
2022 		    !site->global_node_set.node_set_val[msgno.node] &&
2023 		    may_be_dead(site->detected, msgno.node, task_now())){
2024 			propose_missing_values(n);
2025 		} else {
2026 			find_value(site, &ep->wait, n);
2027 		}
2028 		TIMED_TASK_WAIT(&(*p)->rv, ep->delay = wakeup_delay(ep->delay));
2029 		*p = get_cache(msgno);
2030 	}
2031 
2032 	FINALLY
2033 	    DBGOHK(FN; SYCEXP(msgno); PTREXP(*p); NDBG(ep->wait, u); SYCEXP(msgno));
2034 	TASK_END;
2035 }
2036 
set_executed_msg(synode_no msgno)2037 synode_no set_executed_msg(synode_no msgno)
2038 {
2039 	DBGOUT(FN; STRLIT("changing executed_msg from "); SYCEXP(executed_msg); STRLIT(" to "); SYCEXP(msgno));
2040 	if (synode_gt(msgno, current_message)) {
2041 		DBGOHK(FN; STRLIT("changing current message"));
2042 		set_current_message(first_free_synode(msgno));
2043 	}
2044 
2045 	if (msgno.msgno > executed_msg.msgno)
2046 		task_wakeup(&exec_wait);
2047 
2048 	executed_msg = msgno;
2049 	executor_site = find_site_def(executed_msg);
2050 	return executed_msg;
2051 }
2052 
2053 
first_free_synode(synode_no msgno)2054 static synode_no first_free_synode(synode_no msgno)
2055 {
2056 	site_def const * site = find_site_def(msgno);
2057  	synode_no retval = msgno;
2058 	if(get_group_id(site) == 0){
2059 		DBGOUT(FN; PTREXP(site); SYCEXP(msgno));
2060 		if(site){
2061             DBGOUT(FN; SYCEXP(site->boot_key); SYCEXP(site->start); COPY_AND_FREE_GOUT(dbg_site_def(site)));
2062 		}
2063 	}
2064 	assert(get_group_id(site) != 0);
2065 	assert(!synode_eq(msgno, null_synode));
2066 	if (retval.msgno == 0)
2067 		retval.msgno = 1;
2068 	retval.node = get_nodeno(site);
2069 	if (synode_lt(retval, msgno))
2070 		return incr_msgno(retval);
2071 	else
2072 		return retval;
2073 }
2074 
2075 
2076 
set_current_message(synode_no msgno)2077 synode_no set_current_message(synode_no msgno)
2078 {
2079 	MAY_DBG(FN; STRLIT("changing current_message from "); SYCEXP(current_message); STRLIT(" to "); SYCEXP(msgno));
2080 	return current_message = msgno;
2081 }
2082 
2083 
2084 static void	handle_learn(site_def const * site, pax_machine *p, pax_msg *m);
2085 static void	update_max_synode(pax_msg *p);
2086 
2087 #if TASK_DBUG_ON
2088 static void perf_dbg(int *_n, int *_old_n, double *_old_t) MY_ATTRIBUTE((unused));
perf_dbg(int * _n,int * _old_n,double * _old_t)2089 static void perf_dbg(int *_n, int *_old_n, double *_old_t)
2090 {
2091 	int	n = *_n;
2092 	int	old_n = *_old_n;
2093 	double	old_t = *_old_t;
2094 
2095 	DBGOHK(FN; SYCEXP(executed_msg));
2096 	if (!(n % 5000)) {
2097 		GET_GOUT;
2098 		NDBG(get_nodeno(get_site_def()), u);
2099 		NDBG(task_now(), f);
2100 		NDBG(n, d);
2101 		NDBG(median_time(), f);
2102 		SYCEXP(executed_msg);
2103 		PRINT_GOUT;
2104 		FREE_GOUT;
2105 	}
2106 	(*_n)++;
2107 	if (task_now() - old_t > 1.0) {
2108 		GET_GOUT;
2109 		NDBG(get_nodeno(get_site_def()), u);
2110 		NDBG(task_now(), f);
2111 		NDBG(n, d);
2112 		NDBG((n - old_n) / (task_now() - old_t), f);
2113 		PRINT_GOUT;
2114 		FREE_GOUT;
2115 		*_old_t = task_now();
2116 		*_old_n = n;
2117 	}
2118 }
2119 #endif
2120 
2121 #ifdef IGNORE_LOSERS
2122 
LOSER(synode_no x,site_def const * site)2123 static inline int	LOSER(synode_no x, site_def const *site)
2124 {
2125 /*  node_no i = 0;
2126   node_no n = 0;
2127   node_no maxnodes = get_maxnodes(site);
2128 
2129   if (maxnodes == 0)
2130     return 0;
2131 
2132   for (i = 0; i < maxnodes; i++) {
2133     if (site->global_node_set.node_set_val[i]) {
2134       n++;
2135     }
2136   }
2137   DBGOUT(NEXP(maxnodes,u); NEXP(n,u)); */
2138   DBGOUT(NEXP(x.node,u); NEXP(site->global_node_set.node_set_val[(x).node],d));
2139   return
2140     /* ( n > maxnodes / 2 || (ARBITRATOR_HACK && (2 == maxnodes && 0 == get_nodeno(site)))) && */
2141     (!(site)->global_node_set.node_set_val[(x).node] );
2142 }
2143 
2144 #else
2145 #define LOSER(x, site) 0
2146 #endif
2147 
2148 static void	debug_loser(synode_no x)  MY_ATTRIBUTE((unused));
2149 #if defined(TASK_DBUG_ON) && TASK_DBUG_ON
debug_loser(synode_no x)2150 static void	debug_loser(synode_no x)
2151 {
2152 	if (1 || x.msgno < 10) {
2153 		GET_GOUT;
2154 		NDBG(get_nodeno(find_site_def(x)), u);
2155 		STRLIT(" ignoring loser ");
2156 		SYCEXP(x);
2157 		SYCEXP(max_synode);
2158 		PRINT_GOUT;
2159 		FREE_GOUT;
2160 	}
2161 }
2162 #else
2163 /* purecov: begin deadcode */
debug_loser(synode_no x MY_ATTRIBUTE ((unused)))2164 static void	debug_loser(synode_no x MY_ATTRIBUTE((unused)))
2165 {
2166 }
2167 /* purecov: end */
2168 #endif
2169 
2170 /* #define DBGFIX2(x){ GET_GOUT; ADD_F_GOUT("%f ",task_now()); x; PRINT_GOUT; FREE_GOUT; } */
2171 #define DBGFIX2(x)
send_value(site_def const * site,node_no to,synode_no synode)2172 static void	send_value(site_def const *site, node_no to, synode_no synode)
2173 {
2174 	pax_machine * pm = get_cache(synode);
2175 	if (pm && pm->learner.msg) {
2176 		pax_msg * msg = clone_pax_msg(pm->learner.msg);
2177 		if (msg == NULL) return;
2178 		ref_msg(msg);
2179 		send_server_msg(site, to, msg);
2180 		unref_msg(&msg);
2181 	}
2182 }
2183 
2184 /* Peturn message number where it is safe for nodes in prev config to exit */
compute_delay(synode_no start)2185 static synode_no compute_delay(synode_no start)
2186 {
2187 	start.msgno += event_horizon;
2188 	return start;
2189 }
2190 
2191 /* Push messages to all nodes which were in the previous site, but not in this */
inform_removed(int index,int all)2192 static void inform_removed(int index,int all)
2193 {
2194 	site_def * *sites = 0;
2195 	uint32_t	site_count = 0;
2196 	DBGFIX2(FN; NEXP(index, d));
2197 	get_all_site_defs(&sites, &site_count);
2198 	while (site_count > 1 && index >= 0 && (uint32_t)(index + 1) < site_count) {
2199 		site_def * s = sites[index];
2200 		site_def * ps = sites[index+1];
2201 
2202 		/* Compute diff and push messages */
2203 		DBGFIX2(FN; NDBG(index,d); PTREXP(s); if(s)SYCEXP(s->boot_key); PTREXP(ps); if(ps)SYCEXP(ps->boot_key));
2204 
2205 		if (s && ps) {
2206 			node_no i = 0;
2207 			DBGFIX2(FN; SYCEXP(s->boot_key); SYCEXP(s->start);
2208 			    SYCEXP(ps->boot_key); SYCEXP(ps->start));
2209 			for (i = 0; i < ps->nodes.node_list_len; i++) { /* Loop over prev site */
2210 				if (ps->nodeno != i && !node_exists(&ps->nodes.node_list_val[i], &s->nodes)) {
2211 					synode_no synode = s->start;
2212 					synode_no end = compute_delay(s->start);
2213 					while (!synode_gt(synode, end)) { /* Loop over relevant messages */
2214 						send_value(ps, i, synode);
2215 						synode = incr_synode(synode);
2216 					}
2217 				}
2218 			}
2219 		}
2220 		if(! all) /* Early exit if not all configs should be examined */
2221 			break;
2222 		index--;
2223 	}
2224 }
2225 
handle_add_node(app_data_ptr a)2226 site_def *handle_add_node(app_data_ptr a)
2227 {
2228 	site_def * site = clone_site_def(get_site_def());
2229 	DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(&a->body.app_u_u.nodes)); );
2230 	MAY_DBG(FN; COPY_AND_FREE_GOUT(dbg_list(&a->body.app_u_u.nodes)); );
2231 	ADD_EVENTS(
2232 	    add_event(string_arg("a->app_key"));
2233 	    add_synode_event(a->app_key);
2234 	    );
2235 	assert(get_site_def());
2236 	assert(site);
2237 	add_site_def(a->body.app_u_u.nodes.node_list_len,
2238 	    a->body.app_u_u.nodes.node_list_val, site);
2239 	site->start = getstart(a);
2240 	site->boot_key = a->app_key;
2241 	site_install_action(site, a->body.c_t);
2242 	return site;
2243 }
2244 
terminate_and_exit()2245 static void	terminate_and_exit()
2246 {
2247 	XCOM_FSM(xa_terminate, int_arg(0));	/* Tell xcom to stop */
2248 	XCOM_FSM(xa_exit, int_arg(0));		/* Tell xcom to exit */
2249 	if (xcom_expel_cb) xcom_expel_cb(0);
2250 }
2251 
terminator_task(task_arg arg)2252 int	terminator_task(task_arg arg)
2253 {
2254 	DECL_ENV
2255 	    double t;
2256 	END_ENV;
2257 
2258 	TASK_BEGIN
2259 
2260 	    ep->t = get_double_arg(arg);
2261 		TASK_DELAY(ep->t);
2262 		terminate_and_exit();
2263 	FINALLY
2264 	TASK_END;
2265 }
2266 
delayed_terminate_and_exit(double t)2267 static void	delayed_terminate_and_exit(double t)
2268 {
2269 	task_new(terminator_task, double_arg(t), "terminator_task", XCOM_THREAD_DEBUG);
2270 }
2271 
is_empty_site(site_def const * s)2272 static inline int is_empty_site(site_def const *s)
2273 {
2274 	return s->nodes.node_list_len == 0;
2275 }
2276 
handle_remove_node(app_data_ptr a)2277 site_def *handle_remove_node(app_data_ptr a)
2278 {
2279 	site_def * site = clone_site_def(get_site_def());
2280 	DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(&a->body.app_u_u.nodes)));
2281 	ADD_EVENTS(
2282 	    add_event(string_arg("a->app_key"));
2283 	    add_synode_event(a->app_key);
2284 	    add_event(string_arg("nodeno"));
2285 	    add_event(uint_arg(get_nodeno(site)));
2286 	);
2287 
2288 	remove_site_def(a->body.app_u_u.nodes.node_list_len,
2289 	    a->body.app_u_u.nodes.node_list_val, site);
2290 	site->start = getstart(a);
2291 	site->boot_key = a->app_key;
2292 	site_install_action(site, a->body.c_t);
2293 	return site;
2294 }
2295 
handle_config(app_data_ptr a)2296 void handle_config(app_data_ptr a)
2297 {
2298 	while(a){
2299 		switch (a->body.c_t) {
2300 			case unified_boot_type:
2301 				install_node_group(a);
2302 				break;
2303 			case add_node_type:
2304 				handle_add_node(a);
2305 				break;
2306 			case remove_node_type:
2307 				handle_remove_node(a);
2308                 if(xcom_shutdown)
2309 					return;
2310 				break;
2311 			case force_config_type:
2312 				install_node_group(a);
2313 				break;
2314 			default:
2315 				break;
2316 		}
2317 		a = a->next;
2318 	}
2319 }
2320 
2321 enum exec_state {
2322 	FETCH = 0,
2323 	EXECUTE = 1
2324 };
2325 typedef enum exec_state exec_state;
2326 
2327 #define NEXTSTATE(x) ep->state = (x)
2328 
2329 static 	synode_no delivered_msg;
2330 
get_delivered_msg()2331 synode_no get_delivered_msg()
2332 {
2333 	return delivered_msg;
2334 }
2335 
is_member(site_def const * site)2336 static inline int is_member(site_def const *site)
2337 {
2338 	return site->nodeno != VOID_NODE_NO;
2339 }
2340 
2341 /*
2342 Execute xcom message stream.
2343 
2344 Beware of the exit logic in this task, which is both simple and not so simple.
2345 Consider three configs C1 and C2. C1 has two nodes, A and B. C2 has only node B.
2346 C3 is empty.
2347 A config with message number N will be activated after a delay of (at least)
2348 alpha messages, where alpha is the size of the pipeline (or the event horizon).
2349 
2350 So, C1.start = C1+alpha,
2351 and C2.start = C2+alpha. A, which is removed from C1, cannot exit until a majority
2352 of nodes in the new config C2 (in this case B) has learned all the messages from
2353 config C1, which means all messages less than C2.start. How can A know that a majority
2354 of C2 has learned those messages?
2355 
2356 If we denote the first message that is not yet decided (and executed) by E,
2357 the proposers will not try to propose messages with number >= E+alpha,
2358 and all incoming tcp messages with message number >= E+alpha will be ignored.
2359 E is incremented by the executor task, so all messages < E are known.
2360 This means that when the value of E+alpha is known, all messages up to
2361 and including E are also known, although not all messages E+1..E+alpha-1
2362 necessarily are known.
2363 
2364 This leads to the requirement that a node which is removed (A) needs to wait until
2365 it knows the value of C2.start+alpha, since by then it knows that a majority
2366 of the nodes in C2 are ready to execute C2.start, which in turn implies that
2367 a majority of nodes in C2 knows all the values from config C1. Note that the last
2368 message that should be delivered to the application by a node that is leaving C1 is
2369 C2.start-1, which is the last message of C1.
2370 
2371 How does a node that is removed get to know values from the next config?
2372 There are two ways, and we use both. First, the node that tries to exit can
2373 simply ask for the message. get_xcom_message() will do this for all messages
2374 <= max_synode, but it may take some time.
2375 Second, the nodes of C2 can send the messages C2.start..C2.start+alpha
2376 to the nodes that are removed (nodes that are in C1 but not in C2).
2377 inform_removed() does this. We take care to handle the case where configs are close enough
2378 that C0 < C1 <= C0+alpha by tracking the oldest config that contains nodes that are
2379 leaving.
2380 
2381 This takes care of nodes leaving C1. What about nodes that leave C2? C3 is empty,
2382 so B, which is leaving C2, cannot wait for messages from C3. But since C3 is empty,
2383 there is no need to wait. It can exit immediately after having executed C3.start-1, the
2384 last message of C2. What if C3.start-1 < C2.start+alpha? This can happen if C2 and C3
2385 are close. In that case, B will exit before A gets the chance to learn C2.start+alpha,
2386 which will leave A hanging forever. Clearly, we need to impose an additional constraint,
2387 that C3.start must be greater than C2.start+alpha. This is taken care of by the special
2388 test for an empty config.
2389 
2390 Complicated and confusing? Not really, but there is a clean and simple solution which has
2391 not been implemented yet, since it requires more changes to the consensus logic.
2392 If we require that for the messages C2..C2.start-1 we have a majority from both the nodes
2393 in C1 and the nodes in C2, the nodes not in C2 can exit when they have executed message
2394 C2.start-1, since we then know that a majority of the nodes of C2 has agreed on those messages
2395 as well, so they do not depend on the nodes not in C2 any more. This holds even if C2 is empty.
2396 Note that requiring a majority from both C1 and C2 is different from requiring a majority from
2397 C1+C2, which means that the proposer logic needs to consider answers from two different sets of
2398 acceptors for those messages. Since acceptors are identified by their node number, and the node
2399 numbers need not be the same for both configs, we need to maintain a mapping between the nodes
2400 numbers of any two consecutive configs. Alternatively, we could remove the node numbers altogether,
2401 and always use a unique, unchanging ID for a node, like IP address + port.
2402 
2403 */
2404 
2405 /* FIFO which tracks the message numbers where we should deliver queued messages or
2406 inform the removed nodes */
2407 #define FIFO_SIZE 1000
2408 static struct {
2409 	int n;
2410 	int front;
2411 	int rear;
2412 	synode_no q[FIFO_SIZE];
2413 }delay_fifo;
2414 
addone(int i)2415 static inline int addone(int i)
2416 {
2417 	return ((i + 1) % FIFO_SIZE);
2418 }
2419 
2420 /* Is queue empty?  */
fifo_empty()2421 static inline int fifo_empty()
2422 {
2423 	return delay_fifo.n <= 0;
2424 }
2425 
2426 /* Is queue full?  */
fifo_full()2427 static inline int fifo_full()
2428 {
2429 	return delay_fifo.n >= FIFO_SIZE;
2430 }
2431 
2432 
2433 /* Insert in queue  */
fifo_insert(synode_no s)2434 static inline void fifo_insert(synode_no s)
2435 {
2436 	if(! fifo_full()){
2437 		delay_fifo.n++;
2438 		delay_fifo.q[delay_fifo.rear] = s;
2439 		delay_fifo.rear = addone(delay_fifo.rear);
2440 	}
2441 }
2442 
2443 /* Extract first from queue  */
fifo_extract()2444 static inline synode_no fifo_extract()
2445 {
2446 	if(! fifo_empty()){
2447 		synode_no ret = delay_fifo.q[delay_fifo.front];
2448 		delay_fifo.front = addone(delay_fifo.front);
2449 		delay_fifo.n--;
2450 		return ret;
2451 	}else{
2452 		return null_synode;
2453 	}
2454 }
2455 
2456 /* Return first in queue, but do not dequeue  */
fifo_front()2457 static inline synode_no fifo_front()
2458 {
2459 	if(! fifo_empty()){
2460 		return delay_fifo.q[delay_fifo.front];
2461 	}else{
2462 		return null_synode;
2463 	}
2464 }
2465 
executor_task(task_arg arg MY_ATTRIBUTE ((unused)))2466 static int	executor_task(task_arg arg MY_ATTRIBUTE((unused)))
2467 {
2468 	DECL_ENV
2469 	    pax_machine * p;
2470 	int	n ;
2471 	int	old_n;
2472 	double	old_t;
2473 	synode_no exit_synode;
2474 	exec_state state;
2475 	enum {
2476 		no_exit,
2477 		not_member_exit,
2478 		empty_exit
2479 	} exit_type;
2480 	int inform_index;
2481 	END_ENV;
2482 
2483 	TASK_BEGIN
2484 	    ep->p = NULL;
2485 	ep->n = 0;
2486 	ep->old_n = 0;
2487 	ep->old_t = task_now();
2488 	ep->exit_synode = null_synode;
2489 	ep->exit_type = no_exit;
2490 	ep->inform_index = -1;
2491 	delay_fifo.n = 0;
2492 	delay_fifo.front = 0;
2493 	delay_fifo.rear = 0;
2494 
2495 	set_last_received_config(null_synode);
2496 
2497 	if (executed_msg.msgno == 0)
2498 		executed_msg.msgno = 1;
2499 	delivered_msg = executed_msg;
2500 	NEXTSTATE(FETCH);
2501 	executor_site = find_site_def(executed_msg);
2502 
2503 	while (!xcom_shutdown) {
2504 		for (; ; ) {
2505 			if (ep->state == FETCH) {
2506 				if ( !LOSER(executed_msg, executor_site)) {
2507 					TASK_CALL(get_xcom_message(&ep->p, executed_msg, FIND_MAX));
2508 					DBGOUT(FN; STRLIT("got message "); SYCEXP(ep->p->synode); COPY_AND_FREE_GOUT(dbg_app_data(XAPP)));
2509 					/* Execute unified_boot immediately, but do not deliver site message until we */
2510 					/* are ready to execute messages from the new site definition. */
2511 					/* At that point we can be certain that a majority have learned */
2512 					/* everything from the old site. */
2513 
2514 					if ((XAPP) && is_config((XAPP)->body.c_t) &&
2515 						synode_gt(executed_msg, get_site_def()->boot_key)) /* Redo test */
2516 					{
2517 						site_def * site = 0;
2518 						set_last_received_config(executed_msg);
2519 						handle_config(XAPP);
2520 						garbage_collect_site_defs(delivered_msg);
2521 						check_tasks();
2522 						site = get_site_def_rw();
2523 						if (site == 0) {
2524 							TERMINATE;
2525 						}
2526 						DBGFIX2(FN; STRLIT("new config "); SYCEXP(site->boot_key); ); /*SYCEXP(site->start); NEXP(get_nodeno(site),d); NEXP(ARBITRATOR_HACK,d);
2527 								NEXP(ep->exit_type,d); SYCEXP(ep->exit_synode);
2528 								SYCEXP(executed_msg); SYCEXP(max_synode)); */
2529 
2530 						/* If site is empty, increase start to allow nodes to terminate before start */
2531 						if (is_empty_site(site)) {
2532 							site->start = compute_delay(compute_delay(site->start));
2533 						}
2534 						if (ep->exit_type == no_exit){/* We have not yet set the exit trigger */
2535 							synode_no delay_until;
2536 							if(is_member(site)){
2537 								delay_until = compute_delay(site->start);
2538 							} else { /* Not in this site */
2539 								/*
2540 									See if site will be empty when we leave.
2541 									If the new site is empty, we should exit after having
2542 									delivered the last message from the old site.
2543 								*/
2544 								if (is_empty_site(site)) {
2545 									ep->exit_synode = decr_synode(site->start);
2546 									ep->exit_type = empty_exit;
2547 									delay_until = ep->exit_synode;
2548 									DBGFIX2(FN; SYCEXP(ep->exit_synode); SYCEXP(executed_msg); SYCEXP(max_synode));
2549 								}else{
2550 									/*
2551 										If we are not a member of the new site, we should exit after having
2552 										seen enough messages from the new site.
2553 									 */
2554 									ep->exit_synode = compute_delay(site->start);
2555 									ep->exit_type = not_member_exit;
2556 									if (!synode_lt(ep->exit_synode, max_synode)){
2557 										/* We need messages from the next site, so set max_synode accordingly. */
2558 										set_max_synode(incr_synode(ep->exit_synode));
2559 									}
2560 									delay_until = ep->exit_synode;
2561 									DBGFIX2(FN; SYCEXP(delay_until); SYCEXP(executed_msg); SYCEXP(max_synode));
2562 									DBGFIX2(FN; SYCEXP(ep->exit_synode); SYCEXP(executed_msg); SYCEXP(max_synode));
2563 								}
2564 							}
2565 
2566 							if (synode_gt(delay_until, max_synode))
2567 								set_max_synode(delay_until);
2568 							fifo_insert(delay_until);
2569 							ep->inform_index++;
2570 						}
2571 					} else {
2572 						DBGOUT(FN; SYCEXP(executed_msg); SYCEXP(get_site_def()->boot_key));
2573 					}
2574 				} else {
2575 					DBGOUT(FN; debug_loser(executed_msg); PTREXP(executor_site);
2576 						COPY_AND_FREE_GOUT(dbg_node_set(executor_site->global_node_set)));
2577 				}
2578 				DBGOUT(FN; NDBG(ep->state, d); SYCEXP(delivered_msg); SYCEXP(executed_msg);
2579 					SYCEXP(ep->exit_synode); NDBG(ep->exit_type, d));
2580 
2581 				/* See if we should exit when having seen this message */
2582 				if (ep->exit_type == not_member_exit && synode_eq(executed_msg, ep->exit_synode)) {
2583 					inform_removed(ep->inform_index, 1); /* Inform all removed nodes before we exit */
2584 					delayed_terminate_and_exit(TERMINATE_DELAY);	/* Tell xcom to stop */
2585 					TERMINATE;
2586 				}
2587 
2588 				if (fifo_empty()) {
2589 					NEXTSTATE(EXECUTE);
2590 				} else if (synode_eq(executed_msg, fifo_front())) {
2591 					DBGFIX2(FN; SYCEXP(fifo_front()); SYCEXP(executed_msg);
2592 							SYCEXP(ep->exit_synode); NDBG(ep->exit_type, d));
2593 					while(synode_eq(executed_msg, fifo_front())){ /* More than one may match */
2594 						inform_removed(ep->inform_index, 0);
2595 						fifo_extract();
2596 						ep->inform_index--;
2597 					}
2598 					garbage_collect_servers();
2599 					NEXTSTATE(EXECUTE);
2600 				}
2601 				SET_EXECUTED_MSG(incr_synode(executed_msg));
2602 				MAY_DBG(FN; NDBG(ep->state, d); SYCEXP(fifo_front()); SYCEXP(executed_msg));
2603 				MAY_DBG(FN; NDBG(ep->state, d); SYCEXP(ep->exit_synode); SYCEXP(executed_msg));
2604 			} else if (ep->state == EXECUTE) {
2605 				site_def const * x_site = find_site_def(delivered_msg);
2606 
2607 				DBGOUT(FN; NDBG(ep->state, d); SYCEXP(delivered_msg); SYCEXP(delivered_msg); SYCEXP(executed_msg);
2608 					SYCEXP(ep->exit_synode); NDBG(ep->exit_type, d));
2609 				ep->p = get_cache(delivered_msg);
2610 				ADD_EVENTS(
2611 					add_event(string_arg("executing message"));
2612 					add_synode_event(ep->p->synode);
2613 				);
2614 				if (LOSER(delivered_msg, x_site)) {
2615 #ifdef IGNORE_LOSERS
2616 					DBGOUT(FN; debug_loser(delivered_msg); PTREXP(x_site); dbg_node_set(x_site->global_node_set));
2617 #endif
2618 				} else if ((ep->p)->learner.msg->msg_type != no_op) {
2619 					execute_msg(find_site_def(delivered_msg), ep->p, ep->p->learner.msg);
2620 #if defined(TASK_DBUG_ON) && TASK_DBUG_ON
2621 					DBGOUT(perf_dbg(&ep->n, &ep->old_n, &ep->old_t));
2622 #endif
2623 				}
2624 				/* Garbage collect old servers */
2625 				if (synode_eq(delivered_msg, x_site->start)) {
2626 					garbage_collect_servers();
2627 				}
2628 				/* See if we should exit when having delivered this message */
2629 				if (ep->exit_type == empty_exit && synode_eq(delivered_msg, ep->exit_synode)) {
2630 					inform_removed(ep->inform_index, 1); /* Inform all removed nodes before we exit */
2631 					delayed_terminate_and_exit(TERMINATE_DELAY);	/* Tell xcom to stop */
2632 					TERMINATE;
2633 				}
2634 				delivered_msg = incr_synode(delivered_msg);
2635 				if (synode_eq(delivered_msg, executed_msg)) {
2636 					NEXTSTATE(FETCH);
2637 				}
2638 			} else {
2639 				abort();
2640 			}
2641 		}
2642 	}
2643 	FINALLY
2644 		DBGOUT(FN; STRLIT(" shutdown "); SYCEXP(executed_msg); NDBG(task_now(), f));
2645 	TASK_END;
2646 }
2647 
2648 
get_sweep_start()2649 static synode_no  get_sweep_start()
2650 {
2651 	synode_no    find = executed_msg;
2652 	find.node = get_nodeno(find_site_def(find));
2653 	if (find.node < executed_msg.node) {
2654 		find = incr_msgno(find);
2655 	}
2656 	return find;
2657 }
2658 
2659 
sweeper_task(task_arg arg MY_ATTRIBUTE ((unused)))2660 static int	sweeper_task(task_arg arg MY_ATTRIBUTE((unused)))
2661 {
2662 	DECL_ENV
2663 	    synode_no find;
2664 	END_ENV;
2665 
2666 	TASK_BEGIN
2667 
2668 	    ep->find = get_sweep_start();
2669 
2670 	while (!xcom_shutdown) {
2671 		ep->find.group_id = executed_msg.group_id; /* In case group id has changed */
2672 #ifndef AGGRESSIVE_SWEEP
2673 		while (!is_only_task()) {
2674 			TASK_YIELD;
2675 		}
2676 #endif
2677 		ADD_EVENTS(
2678 			add_event(string_arg("sweeper ready"));
2679 			add_synode_event(executed_msg);
2680 		);
2681 /*		DBGOUT(FN; STRLIT("ready to run ");   */
2682 /*			SYCEXP(executed_msg); SYCEXP(max_synode); SYCEXP(ep->find));  */
2683 		 {
2684 			while (synode_lt(ep->find, max_synode) && ! too_far(ep->find)) {
2685 				/* pax_machine * pm = hash_get(ep->find); */
2686 				pax_machine * pm = 0;
2687 				ADD_EVENTS(
2688 					add_event(string_arg("sweeper examining"));
2689 					add_synode_event(ep->find);
2690 				);
2691 				DBGOUT(FN; STRLIT("examining "); SYCEXP(ep->find));
2692 				if (ep->find.node == VOID_NODE_NO) {
2693 					if(synode_gt(executed_msg, ep->find)){
2694 						ep->find = get_sweep_start();
2695 					}
2696 					if (ep->find.node == VOID_NODE_NO)
2697 						goto deactivate;
2698 				}
2699 				pm = get_cache(ep->find);
2700 				if (pm && !pm->force_delivery) { /* We want full 3 phase Paxos for forced messages */
2701 					/* DBGOUT(FN; dbg_pax_machine(pm)); */
2702 					if (!is_busy_machine(pm) && pm->acceptor.promise.cnt == 0 && ! pm->acceptor.msg && !finished(pm)) {
2703 						pm->op = skip_op;
2704 						ADD_EVENTS(
2705 							add_event(string_arg("sweeper skipping"));
2706 							add_synode_event(ep->find);
2707 							add_event(string_arg(pax_op_to_str(pm->op)));
2708 						);
2709 						skip_msg(pax_msg_new(ep->find, find_site_def(ep->find)));
2710 						MAY_DBG(FN; STRLIT("skipping "); SYCEXP(ep->find));
2711 /* 						MAY_DBG(FN; dbg_pax_machine(pm)); */
2712 					}
2713 				}
2714 				ep->find = incr_msgno(ep->find);
2715 			}
2716 		}
2717 	deactivate:
2718 		TASK_DEACTIVATE;
2719 	}
2720 	FINALLY
2721 	    MAY_DBG(FN; STRLIT(" shutdown sweeper "); SYCEXP(executed_msg); NDBG(task_now(), f));
2722 	TASK_END;
2723 }
2724 
2725 
2726 /* }}} */
2727 
2728 #if 0
2729 static double	wakeup_delay(double old)
2730 {
2731 	double	retval = 0.0;
2732 	if (0.0 == old) {
2733 		double	m = median_time();
2734 		if (m == 0.0 || m > 1.0)
2735 			m = 0.1;
2736 		retval = 0.1 + 10.0 * m + m * my_drand48();
2737 	} else {
2738 		retval = old * 1.4142136; /* Exponential backoff */
2739 	}
2740 	while (retval > 10.0)
2741 		retval /= 1.31415926;
2742 	/* DBGOUT(FN; NDBG(retval,d)); */
2743 	return retval;
2744 }
2745 
2746 
2747 #else
wakeup_delay(double old)2748 static double	wakeup_delay(double old)
2749 {
2750 	double	retval = 0.0;
2751 	if (0.0 == old) {
2752 		double	m = median_time();
2753 		if (m == 0.0 || m > 0.3)
2754 			m = 0.1;
2755 		retval = 0.1 + 5.0 * m + m * my_drand48();
2756 	} else {
2757 		retval = old * 1.4142136; /* Exponential backoff */
2758 	}
2759 	while (retval > 3.0)
2760 		retval /= 1.31415926;
2761 	/* DBGOUT(FN; NDBG(retval,d)); */
2762 	return retval;
2763 }
2764 
2765 
2766 #endif
2767 
propose_noop(synode_no find,pax_machine * p)2768 static void	propose_noop(synode_no find, pax_machine *p)
2769 {
2770 	/* Prepare to send a noop */
2771    	site_def const *site = find_site_def(find);
2772   pax_msg *clone = NULL;
2773 	assert(! too_far(find));
2774 	replace_pax_msg(&p->proposer.msg, pax_msg_new(find, site));
2775 	assert(p->proposer.msg);
2776 	create_noop(p->proposer.msg);
2777 /*  	DBGOUT(FN; SYCEXP(find););  */
2778 
2779   clone = clone_pax_msg(p->proposer.msg);
2780   if (clone != NULL) {
2781     push_msg_3p(site, p, clone, find, no_op);
2782   } else {
2783     G_DEBUG("Unable to propose NoOp due to an OOM error.");
2784   }
2785 }
2786 
2787 
send_read(synode_no find)2788 static void	send_read(synode_no find)
2789 {
2790 	/* Prepare to send a read_op */
2791 	site_def const *site = find_site_def(find);
2792  	MAY_DBG(FN; NDBG(get_maxnodes(site),u); NDBG(get_nodeno(site),u););
2793 	if (site && find.node != get_nodeno(site)) {
2794 		pax_msg * pm = pax_msg_new(find, site);
2795 		ref_msg(pm);
2796 		create_read(site, pm);
2797   		MAY_DBG(FN; SYCEXP(find););
2798 
2799  		MAY_DBG(FN; NDBG(get_maxnodes(site),u); NDBG(get_nodeno(site),u); PTREXP(pm));
2800 		/* send_server_msg(site, find.node, pm); */
2801 #if 0
2802 		send_to_others(site, pm, "send_read");
2803 #else
2804 		if(get_nodeno(site) == VOID_NODE_NO)
2805 			send_to_others(site, pm, "send_read");
2806 		else
2807 			send_to_someone(site, pm, "send_read");
2808 #endif
2809 		unref_msg(&pm);
2810 	}
2811 }
2812 
2813 
2814 /* }}} */
2815 
2816 /* {{{ Find missing values */
2817 
ok_to_propose(pax_machine * p)2818 static int	ok_to_propose(pax_machine *p)
2819 {
2820 #if 0
2821 	site_def const *s = find_site_def(p->synode.group_id);
2822 	int	retval = (p->synode.node == get_nodeno(s) || task_now() -p->last_modified > DETECTOR_LIVE_TIMEOUT || may_be_dead(s->detected, p->synode.node, task_now()))
2823 	 && !recently_active(p) && !finished(p) && !is_busy_machine(p);
2824 #else
2825 	int	retval = !recently_active(p) && !finished(p) && !is_busy_machine(p);
2826 #endif
2827 	MAY_DBG(FN; NDBG(p->synode.node, u); NDBG(recently_active(p),d); NDBG(finished(p),d); NDBG(is_busy_machine(p),d); NDBG(retval, d));
2828 	return retval;
2829 }
2830 
2831 
read_missing_values(int n)2832 static void	read_missing_values(int n)
2833 {
2834 	synode_no find = executed_msg;
2835 	synode_no end = max_synode;
2836 	int	i = 0;
2837 
2838 	MAY_DBG(FN; SYCEXP(find); SYCEXP(end));
2839 	if (synode_gt(executed_msg, max_synode) ||
2840 	    synode_eq(executed_msg, null_synode))
2841 		return;
2842 
2843 	while (!synode_gt(find, end) && i < n && ! too_far(find)) {
2844 		pax_machine * p = get_cache(find);
2845 		ADD_EVENTS(
2846 			add_synode_event(find);
2847 			add_synode_event(end);
2848 			add_event(string_arg("active "));
2849 			add_event(int_arg(recently_active(p)));
2850 			add_event(string_arg("finished  "));
2851 			add_event(int_arg(finished(p)));
2852 			add_event(string_arg("busy "));
2853 			add_event(int_arg(is_busy_machine(p)));
2854 		);
2855 		MAY_DBG(FN; SYCEXP(find); SYCEXP(end); NDBG(recently_active(p), d); NDBG(finished(p), d); NDBG(is_busy_machine(p), d));
2856 
2857 
2858 		if (!recently_active(p) && !finished(p) && !is_busy_machine(p)) {
2859 			send_read(find);
2860 		}
2861 		find = incr_synode(find);
2862 		i++;
2863 	}
2864 }
2865 
2866 
propose_missing_values(int n)2867 static void	propose_missing_values(int n)
2868 {
2869 	synode_no find = executed_msg;
2870 	synode_no end = max_synode;
2871 	int	i = 0;
2872 
2873 	DBGOHK(FN; NDBG(get_maxnodes(get_site_def()), u); SYCEXP(find); SYCEXP(end));
2874 	if (
2875 	    synode_gt(executed_msg, max_synode) ||
2876 	    synode_eq(executed_msg, null_synode))
2877 		return;
2878 
2879 	MAY_DBG(FN; SYCEXP(find); SYCEXP(end));
2880 	i = 0;
2881 	while (!synode_gt(find, end) && i < n && ! too_far(find)) {
2882 		pax_machine * p = get_cache(find);
2883  		 DBGOHK(FN; NDBG(ok_to_propose(p),d);  TIMECEXP(task_now());  TIMECEXP(p->last_modified); SYCEXP(find))
2884  		    ;
2885 		if(get_nodeno(find_site_def(find)) == VOID_NODE_NO)
2886 			break;
2887 		if (ok_to_propose(p)) {
2888 			if (task_now() - BUILD_TIMEOUT > p->last_modified){
2889 				propose_noop(find, p);
2890 			}
2891 		}
2892 		find = incr_synode(find);
2893 		i++;
2894 	}
2895 }
2896 
2897 
2898 /* Propose a noop for the range find..end */
request_values(synode_no find,synode_no end)2899 void	request_values(synode_no find, synode_no end)
2900 {
2901 	DBGOUT(FN; SYCEXP(find); SYCEXP(find); SYCEXP(end); );
2902 	while (!synode_gt(find, end) && ! too_far(find)) {
2903 		pax_machine * p = get_cache(find);
2904 		site_def const *site = find_site_def(find);
2905 		if(get_nodeno(site) == VOID_NODE_NO)
2906 			break;
2907 		if (!finished(p) && !is_busy_machine(p)) {
2908 			/* Prepare to send a noop */
2909 			replace_pax_msg(&p->proposer.msg, pax_msg_new(find, site));
2910 			assert(p->proposer.msg);
2911 			create_noop(p->proposer.msg);
2912 
2913 			DBGOUT(FN; STRLIT("propose"); SYCEXP(find); );
2914 			push_msg_3p(site, p, pax_msg_new(find, site), find, no_op);
2915 		}
2916 		find = incr_synode(find);
2917 	}
2918 }
2919 
2920 
2921 /* }}} */
2922 
2923 /* {{{ Message handlers */
2924 #if 0
2925 void	reply_msg(site_def const * site, pax_msg *m)
2926 {
2927 	MAY_DBG(FN; );
2928 	if (get_server(s, m->from)) {
2929 		send_server_msg(s, m->from, m);
2930 	}
2931 }
2932 
2933 #else
2934 #define reply_msg(m) \
2935 { \
2936   if(is_local_node((m)->from, site)){ \
2937     dispatch_op(site, m, NULL); \
2938   }else{ \
2939     if(node_no_exists((m)->from, site) && (m)->group_id == get_group_id(site) && get_server(site, (m)->from)){ \
2940 			send_server_msg(site, (m)->from, m); \
2941 		}else{ \
2942 			link_into(&(msg_link_new((m), (m)->from)->l), reply_queue); \
2943 		} \
2944 	} \
2945 }
2946 #endif
2947 
2948 
2949 #define CREATE_REPLY(x) pax_msg *reply = NULL; CLONE_PAX_MSG(reply, x)
2950 #define SEND_REPLY reply_msg(reply); replace_pax_msg(&reply, NULL)
2951 
safe_app_data_copy(pax_msg ** target,app_data_ptr source)2952 bool_t safe_app_data_copy(pax_msg **target, app_data_ptr source) {
2953   copy_app_data(&(*target)->a, source);
2954   if ((*target)->a == NULL && source != NULL) {
2955     oom_abort = 1;
2956     replace_pax_msg(target, NULL);
2957     return FALSE;
2958   }
2959   return TRUE;
2960 }
2961 
teach_ignorant_node(site_def const * site,pax_machine * p,pax_msg * pm,synode_no synode,linkage * reply_queue)2962 static void	teach_ignorant_node(site_def const * site, pax_machine *p, pax_msg *pm, synode_no synode, linkage *reply_queue)
2963 {
2964 	CREATE_REPLY(pm);
2965 	DBGOUT(FN; SYCEXP(synode));
2966 	reply->synode = synode;
2967 	reply->proposal = p->learner.msg->proposal;
2968 	reply->msg_type =  p->learner.msg->msg_type;
2969 	safe_app_data_copy(&reply, p->learner.msg->a);
2970 	if (reply != NULL) {
2971 	  set_learn_type(reply);
2972 	  /* set_unique_id(reply, p->learner.msg->unique_id); */
2973 	  SEND_REPLY;
2974 	}
2975 }
2976 
2977 
2978 /* Handle incoming read */
handle_read(site_def const * site,pax_machine * p,linkage * reply_queue,pax_msg * pm)2979 static void	handle_read(site_def const * site, pax_machine *p, linkage *reply_queue, pax_msg *pm)
2980 {
2981 	DBGOUT(FN;
2982 	    BALCEXP(pm->proposal);
2983 	    BALCEXP(p->acceptor.promise);
2984 	    if (p->acceptor.msg)
2985 	    BALCEXP(p->acceptor.msg->proposal);
2986 	    STRLIT("type "); STRLIT(pax_msg_type_to_str(pm->msg_type)))         ;
2987 
2988 	if (finished(p)) { /* We have learned a value */
2989 		teach_ignorant_node(site, p, pm, pm->synode, reply_queue);
2990 	}
2991 }
2992 
2993 
2994 #ifdef USE_EXIT_TYPE
miss_prepare(site_def const * site,pax_msg * pm,linkage * reply_queue)2995 static void	miss_prepare(site_def const * site, pax_msg *pm, linkage *reply_queue)
2996 {
2997 	CREATE_REPLY(pm);
2998 	DBGOUT(FN; SYCEXP(pm->synode));
2999 	reply->msg_type = normal;
3000 	reply->a = new_app_data();
3001 	reply->a->body.c_t = exit_type;
3002 	reply->op = ack_prepare_op;
3003 	SEND_REPLY;
3004 }
3005 
3006 
miss_accept(site_def const * site,pax_msg * pm,linkage * reply_queue)3007 static void	miss_accept(site_def const * site, pax_msg *pm, linkage *reply_queue)
3008 {
3009 	CREATE_REPLY(pm);
3010 	DBGOUT(FN; SYCEXP(pm->synode));
3011 	ref_msg(reply);
3012 	reply->msg_type = normal;
3013 	reply->op = ack_accept_op;
3014 	if (servers[pm->from]) {
3015 		send_server_msg(site, pm->from, pm);
3016 	}
3017 	SEND_REPLY;
3018 }
3019 
3020 
3021 #endif
3022 
handle_simple_prepare(site_def const * site,pax_machine * p,pax_msg * pm,synode_no synode,linkage * reply_queue)3023 static void	handle_simple_prepare(site_def const * site, pax_machine *p, pax_msg *pm, synode_no synode, linkage *reply_queue)
3024 {
3025 	if (finished(p)) { /* We have learned a value */
3026 		MAY_DBG(FN; SYCEXP(synode); BALCEXP(pm->proposal); NDBG(finished(p), d));
3027 		teach_ignorant_node(site, p, pm, synode, reply_queue);
3028 	} else {
3029 		int	greater = gt_ballot(pm->proposal, p->acceptor.promise); /* Paxos acceptor phase 1 decision */
3030 		MAY_DBG(FN; SYCEXP(synode); BALCEXP(pm->proposal); NDBG(greater, d));
3031 		if (greater || noop_match(p, pm) ) {
3032 			CREATE_REPLY(pm);
3033 			reply->synode = synode;
3034 			if (greater)
3035 				p->acceptor.promise = pm->proposal; /* promise to not accept any less */
3036 			if (accepted(p)) { /* We have accepted a value */
3037 				reply->proposal = p->acceptor.msg->proposal;
3038 				reply->msg_type =  p->acceptor.msg->msg_type;
3039 				MAY_DBG(FN; STRLIT(" already accepted value "); SYCEXP(synode));
3040 				reply->op = ack_prepare_op;
3041 				safe_app_data_copy(&reply, p->acceptor.msg->a);
3042 				if (reply == NULL) return;  // Failed to allocate memory for the copy.
3043 			} else {
3044 				MAY_DBG(FN; STRLIT(" no value synode "); SYCEXP(synode));
3045 				reply->op = ack_prepare_empty_op;
3046 			}
3047 			SEND_REPLY;
3048 		}
3049 	}
3050 }
3051 
3052 
3053 /* Handle incoming prepare */
handle_prepare(site_def const * site,pax_machine * p,linkage * reply_queue,pax_msg * pm)3054 static void	handle_prepare(site_def const * site, pax_machine *p, linkage *reply_queue, pax_msg *pm)
3055 {
3056 		ADD_EVENTS(
3057 			add_synode_event(p->synode);
3058 			add_event(string_arg("pm->from"));
3059 			add_event(int_arg(pm->from));
3060 			add_event(string_arg(pax_op_to_str(pm->op)));
3061 		);
3062 #if 0
3063 	DBGOUT(FN;
3064 	    NDBG(pm->from, d); NDBG(pm->to, d);
3065 	    SYCEXP(pm->synode);
3066 	    BALCEXP(pm->proposal); BALCEXP(p->acceptor.promise));
3067 #endif
3068 	MAY_DBG(FN; BALCEXP(pm->proposal);
3069 	    BALCEXP(p->acceptor.promise);
3070 	    if (p->acceptor.msg)
3071 	    BALCEXP(p->acceptor.msg->proposal);
3072 	    STRLIT("type "); STRLIT(pax_msg_type_to_str(pm->msg_type)))         ;
3073 
3074 	handle_simple_prepare(site, p, pm, pm->synode, reply_queue);
3075 }
3076 
3077 
check_propose(site_def const * site,pax_machine * p)3078 static void	check_propose(site_def const * site, pax_machine *p)
3079 {
3080 	MAY_DBG(FN; SYCEXP(p->synode);
3081 	    COPY_AND_FREE_GOUT(dbg_machine_nodeset(p, get_maxnodes(site)));
3082 	    );
3083 	PAX_MSG_SANITY_CHECK(p->proposer.msg);
3084 	if (prep_majority(site, p)) {
3085 		p->proposer.msg->proposal = p->proposer.bal;
3086 		BIT_ZERO(p->proposer.prop_nodeset);
3087 		p->proposer.msg->synode = p->synode;
3088 		propose_msg(p->proposer.msg);
3089 		p->proposer.sent_prop = p->proposer.bal;
3090 	}
3091 }
3092 
3093 
check_learn(site_def const * site,pax_machine * p)3094 static void	check_learn(site_def const * site, pax_machine *p)
3095 {
3096 	MAY_DBG(FN; SYCEXP(p->synode);
3097 	    COPY_AND_FREE_GOUT(dbg_machine_nodeset(p, get_maxnodes(site)));
3098 	    );
3099 	PAX_MSG_SANITY_CHECK(p->proposer.msg);
3100 	if (get_nodeno(site) != VOID_NODE_NO && prop_majority(site, p)) {
3101 		p->proposer.msg->synode = p->synode;
3102 		if (p->proposer.msg->receivers)
3103 			free_bit_set(p->proposer.msg->receivers);
3104 		p->proposer.msg->receivers = clone_bit_set(p->proposer.prep_nodeset);
3105 		BIT_SET(get_nodeno(site), p->proposer.msg->receivers);
3106 		if(no_duplicate_payload)
3107 			tiny_learn_msg(site, p->proposer.msg);
3108  		else
3109 			learn_msg(site, p->proposer.msg);
3110 		p->proposer.sent_learn = p->proposer.bal;
3111 	}
3112 }
3113 
3114 
do_learn(site_def const * site MY_ATTRIBUTE ((unused)),pax_machine * p,pax_msg * m)3115 static void	do_learn(site_def const * site MY_ATTRIBUTE((unused)), pax_machine *p, pax_msg *m)
3116 {
3117 	ADD_EVENTS(
3118 		add_synode_event(p->synode);
3119 		add_event(string_arg("m->from"));
3120 		add_event(int_arg(m->from));
3121 		add_event(string_arg(pax_op_to_str(m->op)));
3122 	);
3123 	/* FN; SYCEXP(p->synode); SYCEXP(m->synode); STRLIT(NEWLINE); */
3124 	MAY_DBG(FN; SYCEXP(p->synode); SYCEXP(m->synode);
3125 	    dbg_bitset(m->receivers, get_maxnodes(site));
3126 	    );
3127 	if (m->a)
3128 		m->a->chosen = TRUE;
3129 	replace_pax_msg(&p->acceptor.msg, m);
3130 	replace_pax_msg(&p->learner.msg, m);
3131 	/*
3132 	   Track memory used by client data in the cache.
3133 	   If we do not care about instances that are being decided,
3134 	   it is only necessary to compute the added memory when we
3135 	   record the outcome of a consensus round.
3136 	*/
3137 	add_cache_size(pax_machine_size(p));
3138 	/* Shrink the cache size if necessary */
3139 	shrink_cache();
3140 }
3141 
3142 
handle_simple_ack_prepare(site_def const * site MY_ATTRIBUTE ((unused)),pax_machine * p,pax_msg * m)3143 static void	handle_simple_ack_prepare(site_def const * site MY_ATTRIBUTE((unused)), pax_machine *p, pax_msg *m)
3144 {
3145 	if(get_nodeno(site) != VOID_NODE_NO)
3146 		BIT_SET(m->from, p->proposer.prep_nodeset);
3147 }
3148 
3149 
3150 /* Other node has already accepted a value */
handle_ack_prepare(site_def const * site,pax_machine * p,pax_msg * m)3151 static void	handle_ack_prepare(site_def const * site, pax_machine *p, pax_msg *m)
3152 {
3153 	ADD_EVENTS(
3154 		add_synode_event(p->synode);
3155 		add_event(string_arg("m->from"));
3156 		add_event(int_arg(m->from));
3157 		add_event(string_arg(pax_op_to_str(m->op)));
3158 	);
3159 #if 0
3160 	DBGOUT(FN;
3161 	    NDBG(pm->from, d); NDBG(pm->to, d);
3162 	    SYCEXP(pm->synode);
3163 	    BALCEXP(pm->proposal); BALCEXP(p->acceptor.promise));
3164 #endif
3165 	assert(m);
3166 	MAY_DBG(FN;
3167 	    if (p->proposer.msg)
3168 	    BALCEXP(p->proposer.msg->proposal);
3169 	    BALCEXP(p->proposer.bal);
3170 	    BALCEXP(m->reply_to);
3171 	    BALCEXP(p->proposer.sent_prop);
3172 	    SYCEXP(m->synode))		;
3173 	if (m->from != VOID_NODE_NO && eq_ballot(p->proposer.bal, m->reply_to)) { /* answer to my prepare */
3174 		handle_simple_ack_prepare(site, p, m);
3175 		if (gt_ballot(m->proposal, p->proposer.msg->proposal)) { /* greater */
3176 			replace_pax_msg(&p->proposer.msg, m);
3177 			assert(p->proposer.msg);
3178 		}
3179 		if (gt_ballot(m->reply_to, p->proposer.sent_prop))
3180 			check_propose(site, p);
3181 	}
3182 }
3183 
3184 
3185 /* Other node has not already accepted a value */
handle_ack_prepare_empty(site_def const * site,pax_machine * p,pax_msg * m)3186 static void	handle_ack_prepare_empty(site_def const * site, pax_machine *p, pax_msg *m)
3187 {
3188 	ADD_EVENTS(
3189 		add_synode_event(p->synode);
3190 		add_event(string_arg("m->from"));
3191 		add_event(int_arg(m->from));
3192 		add_event(string_arg(pax_op_to_str(m->op)));
3193 	);
3194 #if 0
3195 	DBGOUT(FN;
3196 	    NDBG(pm->from, d); NDBG(pm->to, d);
3197 	    SYCEXP(pm->synode);
3198 	    BALCEXP(pm->proposal); BALCEXP(p->acceptor.promise));
3199 #endif
3200 	MAY_DBG(FN;
3201 	    if (p->proposer.msg)
3202 	    BALCEXP(p->proposer.msg->proposal);
3203 	    BALCEXP(p->proposer.bal);
3204 	    BALCEXP(m->reply_to);
3205 	    BALCEXP(p->proposer.sent_prop);
3206 	    SYCEXP(m->synode))		;
3207 	if (m->from != VOID_NODE_NO && eq_ballot(p->proposer.bal, m->reply_to)) { /* answer to my prepare */
3208 		handle_simple_ack_prepare(site, p, m);
3209 		if (gt_ballot(m->reply_to, p->proposer.sent_prop))
3210 			check_propose(site, p);
3211 	}
3212 }
3213 
3214 
3215 /* #define AUTO_MSG(p,synode) {if(!(p)){replace_pax_msg(&(p), pax_msg_new(synode, site));} */
3216 
handle_simple_accept(site_def const * site,pax_machine * p,pax_msg * m,synode_no synode,linkage * reply_queue)3217 static void	handle_simple_accept(site_def const * site, pax_machine *p, pax_msg *m, synode_no synode, linkage *reply_queue)
3218 {
3219 	if (finished(p)) { /* We have learned a value */
3220 		teach_ignorant_node(site, p, m, synode, reply_queue);
3221 	} else if (!gt_ballot(p->acceptor.promise, m->proposal) || /* Paxos acceptor phase 2 decision */
3222 	noop_match(p, m) ) {
3223 		MAY_DBG(FN; SYCEXP(m->synode); STRLIT("accept "); BALCEXP(m->proposal));
3224 		replace_pax_msg(&p->acceptor.msg, m);
3225 		 {
3226 			CREATE_REPLY(m);
3227 			reply->op = ack_accept_op;
3228 			reply->synode = synode;
3229 			SEND_REPLY;
3230 		}
3231 	}
3232 }
3233 
3234 
3235 /* Accecpt value if promise is not greater */
handle_accept(site_def const * site,pax_machine * p,linkage * reply_queue,pax_msg * m)3236 static void	handle_accept(site_def const * site, pax_machine *p, linkage *reply_queue, pax_msg *m)
3237 {
3238 	MAY_DBG(FN;
3239 	    BALCEXP(p->acceptor.promise);
3240 	    BALCEXP(m->proposal);
3241 	    STREXP(pax_msg_type_to_str(m->msg_type)));
3242 	PAX_MSG_SANITY_CHECK(m);
3243 	ADD_EVENTS(
3244 		add_synode_event(p->synode);
3245 		add_event(string_arg("m->from"));
3246 		add_event(int_arg(m->from));
3247 		add_event(string_arg(pax_op_to_str(m->op)));
3248 	);
3249 
3250 	handle_simple_accept(site, p, m, m->synode, reply_queue);
3251 }
3252 
3253 
3254 /* Handle answer to accept */
handle_ack_accept(site_def const * site,pax_machine * p,pax_msg * m)3255 static void	handle_ack_accept(site_def const * site, pax_machine *p, pax_msg *m)
3256 {
3257 	ADD_EVENTS(
3258 		add_synode_event(p->synode);
3259 		add_event(string_arg("m->from"));
3260 		add_event(int_arg(m->from));
3261 		add_event(string_arg(pax_op_to_str(m->op)));
3262 	);
3263 	MAY_DBG(FN; SYCEXP(m->synode); BALCEXP(p->proposer.bal); BALCEXP(p->proposer.sent_learn); BALCEXP(m->proposal); BALCEXP(m->reply_to);
3264 	    );
3265 	MAY_DBG(FN; SYCEXP(p->synode);
3266 	    if (p->acceptor.msg)
3267 	    BALCEXP(p->acceptor.msg->proposal);
3268 	    BALCEXP(p->proposer.bal);
3269 	    BALCEXP(m->reply_to);
3270 	    )		;
3271 	if (get_nodeno(site) != VOID_NODE_NO && m->from != VOID_NODE_NO &&
3272       eq_ballot(p->proposer.bal, m->reply_to)) { /* answer to my accept */
3273 		BIT_SET(m->from, p->proposer.prop_nodeset);
3274 		if (gt_ballot(m->proposal, p->proposer.sent_learn))
3275 			check_learn(site, p);
3276 	}
3277 }
3278 
3279 /* Configure all messages in interval start, end to be forced */
force_interval(synode_no start,synode_no end)3280 static void force_interval(synode_no start, synode_no end)
3281 {
3282 	while (synode_lt(start, end)) {
3283 		pax_machine * p = get_cache(start);
3284 		if(get_nodeno(find_site_def(start)) == VOID_NODE_NO)
3285 			break;
3286 		/* if(! finished(p)) */
3287 		p->force_delivery = 1;
3288 		/* Old nodesets are null and void */
3289 		BIT_ZERO(p->proposer.prep_nodeset);
3290 		BIT_ZERO(p->proposer.prep_nodeset);
3291 		start = incr_synode(start);
3292 	}
3293 }
3294 
start_force_config(site_def * s)3295 static void start_force_config(site_def *s)
3296 {
3297 	synode_no end = s->boot_key;
3298 
3299 	synode_set_to_event_horizon(&end);
3300 
3301 	DBGOUT(FN; SYCEXP(executed_msg); SYCEXP(end));
3302 	if (synode_gt(end, max_synode))
3303 		set_max_synode(end);
3304 
3305 	free_site_def(forced_config);
3306 	forced_config = s;
3307 	force_interval(executed_msg, max_synode); /* Force everything in the pipeline */
3308 }
3309 
3310 
3311 /* Learn this value */
handle_learn(site_def const * site,pax_machine * p,pax_msg * m)3312 static void	handle_learn(site_def const * site, pax_machine *p, pax_msg *m)
3313 {
3314 	MAY_DBG(FN; STRLIT("proposer nodeset ");
3315 	    dbg_bitset(p->proposer.prop_nodeset, get_maxnodes(site));
3316 	    );
3317 	MAY_DBG(FN; STRLIT("receivers ");
3318 	    dbg_bitset(m->receivers, get_maxnodes(site));
3319 	    );
3320 	MAY_DBG(FN; NDBG(task_now(), f); SYCEXP(p->synode);
3321 	    COPY_AND_FREE_GOUT(dbg_app_data(m->a));
3322 	    );
3323 
3324 	PAX_MSG_SANITY_CHECK(m);
3325 
3326 	if (!finished(p)) { /* Avoid re-learn */
3327 		do_learn(site, p, m);
3328 		/* Check for special messages */
3329 		if(m->a && m->a->body.c_t == unified_boot_type){
3330 			DBGOUT(FN; STRLIT("Got unified_boot "); SYCEXP(p->synode); SYCEXP(m->synode); );
3331 			XCOM_FSM(xa_net_boot, void_arg(m->a));
3332 		}
3333 		/* See if someone is forcing a new config */
3334 		if(m->force_delivery && m->a){
3335 			DBGOUT(FN; STRLIT("Got forced config "); SYCEXP(p->synode); SYCEXP(m->synode); );
3336 			/* Configure all messages from executed_msg until start of new config
3337 			   as forced messages so they will eventually be finished */
3338 			/* Immediately install this new config */
3339 			switch (m->a->body.c_t) {
3340 			case add_node_type:
3341 /* purecov: begin deadcode */
3342 				start_force_config(clone_site_def(handle_add_node(m->a)));
3343 				break;
3344 /* purecov: end */
3345 			case remove_node_type:
3346 /* purecov: begin deadcode */
3347 				start_force_config(clone_site_def(handle_remove_node(m->a)));
3348 				break;
3349 /* purecov: end */
3350 			case force_config_type:
3351 				start_force_config(clone_site_def(install_node_group(m->a)));
3352 				break;
3353 			default:
3354 				break;
3355 			}
3356 			force_interval(executed_msg, getstart(m->a));
3357 		}
3358 	}
3359 
3360 	task_wakeup(&p->rv);
3361 }
3362 
3363 
3364 /* Skip this value */
handle_skip(site_def const * site,pax_machine * p,pax_msg * m)3365 static void	handle_skip(site_def const * site, pax_machine *p, pax_msg *m)
3366 {
3367 	/*   MAY_DBG(FN;); */
3368 	/*   MAY_DBG(FN; NDBG(task_now(),f); SYCEXP(p->msg->synode)); */
3369 	if (!finished(p)) {
3370 		skip_value(m);
3371 		do_learn(site, p, m);
3372 	}
3373 	/*   MAY_DBG(FN; STRLIT("taskwakeup "); SYCEXP(p->msg->synode)); */
3374 	task_wakeup(&p->rv);
3375 }
3376 
3377 
handle_client_msg(pax_msg * p)3378 static void	handle_client_msg(pax_msg *p)
3379 {
3380 	if (!p || p->a == NULL)
3381 		/* discard invalid message */
3382 		return;
3383 
3384 	{
3385 		msg_link * ml = msg_link_new(p, VOID_NODE_NO);
3386 
3387 		/* Put it in the proposer queue */
3388 		ADD_T_EV(task_now(), __FILE__, __LINE__, "handle_client_msg");
3389 		channel_put(&prop_input_queue, &ml->l);
3390 	}
3391 }
3392 
3393 /* Handle incoming alive message */
3394 static double	sent_alive = 0.0;
handle_alive(site_def const * site,linkage * reply_queue,pax_msg * pm)3395 static inline void	handle_alive(site_def const * site, linkage *reply_queue, pax_msg *pm)
3396 {
3397 	int not_to_oneself = (pm->from != get_nodeno(site) && pm->from != pm->to);
3398 	DBGOUT(FN; SYCEXP(pm->synode); NDBG(pm->from,u); NDBG(pm->to,u); );
3399 
3400 	/*
3401 	  This code will check if the ping is intended to us.
3402 	  If the encoded node does not exist in the current configuration,
3403 	  we avoid sending need_boot_op, since it must be from a different
3404 	  reincarnation of this node.
3405 	*/
3406 	if(site && pm->a && pm->a->body.c_t == xcom_boot_type)
3407 	{
3408 		DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(&pm->a->body.app_u_u.nodes)););
3409 		not_to_oneself &=
3410 			node_exists_with_uid(&pm->a->body.app_u_u.nodes.node_list_val[0], &get_site_def()->nodes);
3411 	}
3412 
3413 
3414 	if (!client_boot_done && /* Already done? */
3415 	    not_to_oneself && /* Not to oneself */
3416 	    !is_dead_site(pm->group_id)) { /* Avoid dealing with zombies */
3417 		double	t = task_now();
3418 		if (t - sent_alive > 1.0) {
3419 			CREATE_REPLY(pm);
3420 			reply->op = need_boot_op;
3421 			SEND_REPLY;
3422 			sent_alive = t;
3423 			DBGOUT(FN; STRLIT("sent need_boot_op"); );
3424 		}
3425 	}
3426 }
3427 
3428 
update_max_synode(pax_msg * p)3429 static void	update_max_synode(pax_msg *p)
3430 {
3431 #if 0
3432 	/* if (group_id == 0 || synode_eq(max_synode, null_synode) ||  */
3433 	/*     synode_gt(p->max_synode, max_synode)) { */
3434 	/*   set_max_synode(p->max_synode); */
3435 	/* } */
3436 	if (group_id == 0 || synode_eq(max_synode, null_synode) ||
3437 	    (p->msg_type == normal &&
3438 	    (max_synode.group_id == 0 || p->synode.group_id == 0 ||
3439 	    max_synode.group_id == p->synode.group_id) &&
3440 	    synode_gt(p->synode, max_synode))) {
3441 		set_max_synode(p->synode);
3442 	}
3443 #else
3444 	if (is_dead_site(p->group_id))
3445 		return;
3446 	if (get_group_id(get_site_def()) == 0 || max_synode.group_id == 0) {
3447 		set_max_synode(p->synode);
3448 	} else if (max_synode.group_id == p->synode.group_id) {
3449 		if (synode_gt(p->synode, max_synode)) {
3450 			set_max_synode(p->synode);
3451 		}
3452 		if (synode_gt(p->max_synode, max_synode)) {
3453 			set_max_synode(p->max_synode);
3454 		}
3455 	}
3456 #endif
3457 }
3458 
3459 
3460 /* Add app_data to message cache */
3461 /* purecov: begin deadcode */
add_to_cache(app_data_ptr a,synode_no synode)3462 void	add_to_cache(app_data_ptr a, synode_no synode)
3463 {
3464   pax_machine * pm = get_cache(synode);
3465 	pax_msg * msg = pax_msg_new_0(synode);
3466 	ref_msg(msg);
3467 	assert(pm);
3468 	safe_app_data_copy(&msg, a);
3469 	if (msg != NULL) {
3470 	  set_learn_type(msg);
3471 	  /* msg->unique_id = a->unique_id; */
3472 	  do_learn(0, pm, msg);
3473 	  unref_msg(&msg);
3474 	}
3475 }
3476 /* purecov: end */
3477 
3478 /* }}} */
3479 
3480 /* {{{ Message dispatch */
3481 #define BAL_FMT "ballot {cnt %d node %d}"
3482 #define BAL_MEM(x) (x).cnt, (x).node
3483 
3484 static int clicnt = 0;
3485 
is_reincarnation_adding(app_data_ptr a)3486 static u_int is_reincarnation_adding(app_data_ptr a)
3487 {
3488 	/* Get information on the current site definition */
3489 	const site_def* new_site_def= get_site_def();
3490 	const site_def* valid_site_def= find_site_def(executed_msg);
3491 
3492 	/* Get information on the nodes to be added */
3493 	u_int nodes_len  = a->body.app_u_u.nodes.node_list_len;
3494 	node_address* nodes_to_change= a->body.app_u_u.nodes.node_list_val;
3495 
3496 	u_int i = 0;
3497 	for(; i < nodes_len; i++)
3498 	{
3499 		if (node_exists(&nodes_to_change[i], &new_site_def->nodes) ||
3500 			node_exists(&nodes_to_change[i], &valid_site_def->nodes))
3501 		{
3502 			/*
3503 			We are simply ignoring the attempt to add a node to the
3504 			group when there is an old incarnation of it, meaning
3505 			that the node has crashed and restarted so fastly that
3506 			nobody has noticed that it has gone.
3507 
3508 			In XCOM, the group is not automatically reconfigured
3509 			and it is possible to start reusing a node that has
3510 			crashed and restarted without reconfiguring the group
3511 			by adding the node back to it.
3512 
3513 			However, this operation may be unsafe because XCOM
3514 			does not implement a crash-recovery model and nodes
3515 			suffer from amnesia after restarting the service. In
3516 			other words this may lead to inconsistency issues in
3517 			the paxos protocol.
3518 
3519 			Unfortunately, preventing that a node is added back
3520 			to the system where there is an old incarnation will
3521 			not fix this problem since other changes are required.
3522 			*/
3523 			G_MESSAGE("Old incarnation found while trying to add node %s %.*s.",
3524 				  nodes_to_change[i].address,
3525 				  nodes_to_change[i].uuid.data.data_len,
3526 				  nodes_to_change[i].uuid.data.data_val
3527 			);
3528 			return 1;
3529 		}
3530 	}
3531 
3532 	return 0;
3533 }
3534 
is_reincarnation_removing(app_data_ptr a)3535 static u_int is_reincarnation_removing(app_data_ptr a)
3536 {
3537 	/* Get information on the current site definition */
3538 	const site_def* new_site_def= get_site_def();
3539 
3540 	/* Get information on the nodes to be added */
3541 	u_int nodes_len  = a->body.app_u_u.nodes.node_list_len;
3542 	node_address* nodes_to_change= a->body.app_u_u.nodes.node_list_val;
3543 
3544 	u_int i = 0;
3545 	for(; i < nodes_len; i++)
3546 	{
3547 		if (!node_exists_with_uid(&nodes_to_change[i], &new_site_def->nodes))
3548 		{
3549 			/*
3550 			We cannot allow an upper-layer to remove a new incarnation
3551 			of a node, when it tries to remove an old one.
3552 			*/
3553 			G_MESSAGE("Old incarnation found while trying to "
3554 				  "remove node %s %.*s.",
3555 				  nodes_to_change[i].address,
3556 				  nodes_to_change[i].uuid.data.data_len,
3557 				  nodes_to_change[i].uuid.data.data_val
3558 			);
3559 
3560 			return 1;
3561 		}
3562 	}
3563 
3564 	return 0;
3565 }
3566 
3567 /**
3568  * Logs the fact that an add/remove node request is aimed at another group.
3569  *
3570  * @param a a pointer to the app_data of the configuration command
3571  * @param message_fmt a formatted message to log, containing a single %s that will be replaced by the node's address
3572  */
log_cfgchange_wrong_group(app_data_ptr a,const char * const message_fmt)3573 static void log_cfgchange_wrong_group(app_data_ptr a, const char *const message_fmt)
3574 {
3575 	u_int const nr_nodes = a->body.app_u_u.nodes.node_list_len;
3576 	u_int i;
3577 	for (i = 0; i < nr_nodes; i++)
3578 	{
3579 		char const *const address = a->body.app_u_u.nodes.node_list_val[i].address;
3580 		G_WARNING(message_fmt, address);
3581 	}
3582 }
3583 
3584 /**
3585  * Validates if a configuration command can be executed.
3586  * Checks whether the configuration command is aimed at the correct group.
3587  * Checks whether the configuration command pertains to a node reincarnation.
3588  *
3589  * @param p a pointer to the pax_msg of the configuration command
3590  * @retval REQUEST_OK if the reconfiguration command can be executed
3591  * @retval REQUEST_RETRY if XCom is still booting
3592  * @retval REQUEST_FAIL if the configuration command cannot be executed
3593  */
can_execute_cfgchange(pax_msg * p)3594 static client_reply_code can_execute_cfgchange(pax_msg *p)
3595 {
3596 	app_data_ptr a = p->a;
3597 
3598 	if (executed_msg.msgno <= 2)
3599 		return REQUEST_RETRY;
3600 
3601 	if (a && a->group_id != 0 && a->group_id != executed_msg.group_id)
3602 	{
3603 		switch (a->body.c_t) {
3604 		case add_node_type:
3605 			log_cfgchange_wrong_group(a, "The request to add %s to the group has been rejected because it is aimed at another group");
3606 			break;
3607 		case remove_node_type:
3608 			log_cfgchange_wrong_group(a, "The request to remove %s from the group has been rejected because it is aimed at another group");
3609 			break;
3610 		case force_config_type:
3611 			G_WARNING("The request to force the group membership has been rejected because it is aimed at another group");
3612 			break;
3613 		default:
3614 			assert(0 && "A cargo_type different from {add_node_type, remove_node_type, force_config_type} should not have hit this code path");
3615 		}
3616 		return REQUEST_FAIL;
3617 	}
3618 
3619         if (a && a->body.c_t == add_node_type && is_reincarnation_adding(a))
3620 		return REQUEST_FAIL;
3621 
3622         if (a && a->body.c_t == remove_node_type && is_reincarnation_removing(a))
3623 		return REQUEST_FAIL;
3624 
3625 	return REQUEST_OK;
3626 }
3627 
activate_sweeper()3628 static void activate_sweeper()
3629 {
3630 	if (sweeper) {
3631 		ADD_EVENTS(
3632 			add_event(string_arg("sweeper activated max_synode"));
3633 			add_synode_event(max_synode);
3634 		);
3635 		task_activate(sweeper);
3636 	}
3637 }
3638 
dispatch_op(site_def const * site,pax_msg * p,linkage * reply_queue)3639 pax_msg *dispatch_op(site_def const *site, pax_msg *p, linkage *reply_queue)
3640 {
3641 	pax_machine * pm = NULL;
3642 	site_def * dsite = find_site_def_rw(p->synode);
3643 	int	in_front = too_far(p->synode);
3644 
3645 	if(p->force_delivery){
3646 		/* Ensure that forced message can be processed */
3647 		in_front = 0;
3648 	}
3649 
3650 	if (dsite && p->op != client_msg){
3651 		note_detected(dsite, p->from);
3652 		update_delivered(dsite, p->from, p->delivered_msg);
3653 	}
3654 
3655 	MAY_DBG(FN; STRLIT("incoming message ");
3656 			COPY_AND_FREE_GOUT(dbg_pax_msg(p));
3657 	    );
3658 	ADD_EVENTS(
3659 		add_synode_event(p->synode);
3660 		add_event(string_arg("p->from"));
3661 		add_event(int_arg(p->from));
3662 		add_event(string_arg(pax_op_to_str(p->op)));
3663 	);
3664 	switch (p->op) {
3665 	case client_msg:
3666 		clicnt++;
3667 		if (p->a && (p->a->body.c_t == enable_arbitrator)) {
3668 			CREATE_REPLY(p);
3669 			DBGOUT(FN; STRLIT("Got enable_arbitrator from client"); SYCEXP(p->synode); );
3670 			ARBITRATOR_HACK = 1;
3671 			reply->op = xcom_client_reply;
3672 			reply->cli_err = REQUEST_OK;
3673 			SEND_REPLY;
3674 			break;
3675 		}
3676 		if (p->a && (p->a->body.c_t == disable_arbitrator)) {
3677 			CREATE_REPLY(p);
3678 			DBGOUT(FN; STRLIT("Got disable_arbitrator from client"); SYCEXP(p->synode); );
3679 			ARBITRATOR_HACK = 0;
3680 			reply->op = xcom_client_reply;
3681 			reply->cli_err = REQUEST_OK;
3682 			SEND_REPLY;
3683 			break;
3684 		}
3685 		if (p->a && (p->a->body.c_t == set_cache_limit)) {
3686 			CREATE_REPLY(p);
3687 			DBGOUT(FN; STRLIT("Got set_cache_limit from client"); SYCEXP(p->synode); );
3688 			if(the_app_xcom_cfg){
3689 				set_max_cache_size(p->a->body.app_u_u.cache_limit);
3690 				reply->cli_err = REQUEST_OK;
3691 			}else{
3692 				reply->cli_err = REQUEST_FAIL;
3693 			}
3694 			reply->op = xcom_client_reply;
3695 			SEND_REPLY;
3696 			break;
3697 		}
3698 		if (p->a && (p->a->body.c_t == x_terminate_and_exit)) {
3699 			CREATE_REPLY(p);
3700 			DBGOUT(FN; STRLIT("Got terminate_and_exit from client"); SYCEXP(p->synode); );
3701 			reply->op = xcom_client_reply;
3702 			reply->cli_err = REQUEST_OK;
3703 			SEND_REPLY;
3704 			/*
3705 			  The function frees sites which is used by SEND_REPLY,
3706 			  so it should be called after SEND_REPLY.
3707 			*/
3708 			terminate_and_exit();
3709 			break;
3710 		}
3711 		if (p->a && (p->a->body.c_t == add_node_type ||
3712 					 p->a->body.c_t == remove_node_type ||
3713 					 p->a->body.c_t == force_config_type)) {
3714 			client_reply_code cli_err;
3715 			CREATE_REPLY(p);
3716 			reply->op = xcom_client_reply;
3717 			reply->cli_err = cli_err = can_execute_cfgchange(p);
3718 			SEND_REPLY;
3719 			if (cli_err != REQUEST_OK) {
3720 				break;
3721 			}
3722 		}
3723 		if (p->a && p->a->body.c_t == unified_boot_type) {
3724 			DBGOUT(FN; STRLIT("Got unified_boot from client"); SYCEXP(p->synode); );
3725 			DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(&p->a->body.app_u_u.nodes)); );
3726 			DBGOUT(STRLIT("handle_client_msg "); NDBG(p->a->group_id, x));
3727 			XCOM_FSM(xa_net_boot, void_arg(p->a));
3728 		}
3729 		if (p->a && p->a->body.c_t == add_node_type) {
3730 			DBGOUT(FN; STRLIT("Got add_node from client"); SYCEXP(p->synode); );
3731 			DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(&p->a->body.app_u_u.nodes)); );
3732 			DBGOUT(STRLIT("handle_client_msg "); NDBG(p->a->group_id, x));
3733 			assert(get_site_def());
3734 		}
3735 		if (p->a && p->a->body.c_t == remove_node_type) {
3736 			DBGOUT(FN; STRLIT("Got remove_node from client"); SYCEXP(p->synode); );
3737 			DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(&p->a->body.app_u_u.nodes)); );
3738 			DBGOUT(STRLIT("handle_client_msg "); NDBG(p->a->group_id, x));
3739 			assert(get_site_def());
3740 		}
3741 		if (p->a && p->a->body.c_t == force_config_type) {
3742 			DBGOUT(FN; STRLIT("Got new config from client"); SYCEXP(p->synode); );
3743 			DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(&p->a->body.app_u_u.nodes)); );
3744 			DBGOUT(STRLIT("handle_client_msg "); NDBG(p->a->group_id, x));
3745 			assert(get_site_def());
3746 			XCOM_FSM(xa_force_config, void_arg(p->a));
3747 		}
3748 		handle_client_msg(p);
3749 		break;
3750 	case initial_op:
3751 		break;
3752 	case read_op:
3753 		pm = get_cache(p->synode);
3754 		assert(pm);
3755 		if(client_boot_done)
3756 			handle_alive(site, reply_queue, p);
3757 		handle_read(site, pm, reply_queue, p);
3758 		break;
3759 	case prepare_op:
3760 		pm = get_cache(p->synode);
3761 		assert(pm);
3762 		if(p->force_delivery)
3763 			pm->force_delivery = 1;
3764 		pm->last_modified = task_now();
3765 		if(client_boot_done)
3766 			handle_alive(site, reply_queue, p);
3767 		handle_prepare(site, pm, reply_queue, p);
3768 		break;
3769 	case ack_prepare_op:
3770 		if (in_front || !is_cached(p->synode))
3771 			break;
3772 		pm = get_cache(p->synode);
3773 		if(p->force_delivery)
3774 			pm->force_delivery = 1;
3775 		if (!pm->proposer.msg)
3776 			break;
3777 		assert(pm && pm->proposer.msg);
3778 		handle_ack_prepare(site, pm, p);
3779 		break;
3780 	case ack_prepare_empty_op:
3781 		if (in_front || !is_cached(p->synode))
3782 			break;
3783 		pm = get_cache(p->synode);
3784 		if(p->force_delivery)
3785 			pm->force_delivery = 1;
3786 		if (!pm->proposer.msg)
3787 			break;
3788 		assert(pm && pm->proposer.msg);
3789 		handle_ack_prepare_empty(site, pm, p);
3790 		break;
3791 	case accept_op:
3792 		pm = get_cache(p->synode);
3793 		assert(pm);
3794 		if(p->force_delivery)
3795 			pm->force_delivery = 1;
3796 		pm->last_modified = task_now();
3797 		handle_alive(site, reply_queue, p);
3798 		handle_accept(site, pm, reply_queue, p);
3799 		break;
3800 	case ack_accept_op:
3801 		if (in_front || !is_cached(p->synode))
3802 			break;
3803 		pm = get_cache(p->synode);
3804 		if(p->force_delivery)
3805 			pm->force_delivery = 1;
3806 		if (!pm->proposer.msg)
3807 			break;
3808 		assert(pm && pm->proposer.msg);
3809 		handle_ack_accept(site, pm, p);
3810 		break;
3811 	case recover_learn_op:
3812 		DBGOUT(FN; STRLIT("recover_learn_op receive "); SYCEXP(p->synode));
3813 		pm = get_cache(p->synode);
3814 		assert(pm);
3815 		if(p->force_delivery)
3816 			pm->force_delivery = 1;
3817 		pm->last_modified = task_now();
3818 		update_max_synode(p);
3819 		{
3820 			DBGOUT(FN; STRLIT("recover_learn_op learn "); SYCEXP(p->synode));
3821 			p->op = learn_op;
3822 			handle_learn(site, pm, p);
3823 		}
3824 		break;
3825 	case learn_op:
3826 learnop:
3827 		pm = get_cache(p->synode);
3828 		assert(pm);
3829 		if(p->force_delivery)
3830 			pm->force_delivery = 1;
3831 		pm->last_modified = task_now();
3832 		update_max_synode(p);
3833 		activate_sweeper();
3834 		handle_learn(site, pm, p);
3835 		break;
3836 	case tiny_learn_op:
3837 		if (p->msg_type == no_op)
3838 			goto learnop;
3839 		pm = get_cache(p->synode);
3840 		assert(pm);
3841 		if(p->force_delivery)
3842 			pm->force_delivery = 1;
3843 		if (pm->acceptor.msg) {
3844 			/* 			BALCEXP(pm->acceptor.msg->proposal); */
3845 			if (eq_ballot(pm->acceptor.msg->proposal, p->proposal)) {
3846 				pm->acceptor.msg->op = learn_op;
3847 				pm->last_modified = task_now();
3848 				update_max_synode(p);
3849 				activate_sweeper();
3850 				handle_learn(site, pm, pm->acceptor.msg);
3851 			} else {
3852 				send_read(p->synode);
3853 				DBGOUT(FN; STRLIT("tiny_learn"); SYCEXP(p->synode);
3854 				    BALCEXP(pm->acceptor.msg->proposal); BALCEXP(p->proposal));
3855 			}
3856 		} else {
3857 			send_read(p->synode);
3858 			DBGOUT(FN; STRLIT("tiny_learn"); SYCEXP(p->synode);
3859 			    BALCEXP(p->proposal));
3860 		}
3861 		break;
3862 	case skip_op:
3863 		pm = get_cache(p->synode);
3864 		assert(pm);
3865 		if(p->force_delivery)
3866 			pm->force_delivery = 1;
3867 		pm->last_modified = task_now();
3868 		handle_skip(site, pm, p);
3869 		break;
3870 	case i_am_alive_op:
3871 		handle_alive(site, reply_queue, p);
3872 		break;
3873 	case are_you_alive_op:
3874 		handle_alive(site, reply_queue, p);
3875 		break;
3876 	case need_boot_op:
3877 /* purecov: begin deadcode */
3878 		XCOM_FSM(xa_need_snapshot, void_arg(p));
3879 		break;
3880 /* purecov: end */
3881 	case snapshot_op:
3882 		if (!is_dead_site(p->group_id)) {
3883 			update_max_synode(p);
3884 		}
3885 		break;
3886 	case gcs_snapshot_op:
3887 		if (!is_dead_site(p->group_id)) {
3888 			update_max_synode(p);
3889 			XCOM_FSM(xa_snapshot, void_arg(p));
3890 			XCOM_FSM(xa_complete,int_arg(0));
3891 		}
3892 		break;
3893 	case die_op:
3894 		/* assert("die horribly" == "need coredump"); */
3895 		 {
3896 			GET_GOUT;
3897 			FN;
3898 			STRLIT("die_op ");
3899 			SYCEXP(executed_msg);
3900 			SYCEXP(delivered_msg);
3901 			SYCEXP(p->synode);
3902 			SYCEXP(p->delivered_msg);
3903 			SYCEXP(p->max_synode);
3904 			PRINT_GOUT;
3905 			FREE_GOUT;
3906 		}
3907 		/*
3908 		If the message with the number in  the  incoming  die_op  message
3909 		already  has  been  executed  (delivered),  then it means that we
3910 		actually got consensus on it, since otherwise we would  not  have
3911 		delivered it.Such a situation could arise if one of the nodes has
3912 		expelled the message from its cache, but others have not. So when
3913 		sending  out  a  request, we might get two different answers, one
3914 		indicating that we are too far behind  and  should  restart,  and
3915 		another  with  the  actual  consensus value. If the value arrives
3916 		first, we will deliver it, and then the die_op may arrive  later.
3917 		But  it this case it does not matter, since we got what we needed
3918 		anyway. It is only a partial guard against exiting without really
3919 		needing  it  of course, since the die_op may arrive first, and we
3920 		do not wait for a die_op from all the other nodes.  We  could  do
3921 		that  with  some extra housekeeping in the pax_machine (a new bit
3922 		vector), but I am not convinced that it is worth the effort.
3923 		*/
3924 		if(!synode_lt(p->synode, executed_msg)){
3925 			g_critical("Node %u unable to get messages, since the "
3926 				   "group is too far ahead. Node will now exit.",
3927 				get_nodeno(site));
3928 			terminate_and_exit();
3929 		}
3930 	default:
3931 		break;
3932 	}
3933 	if (oom_abort) {
3934 	  g_critical("Node %u has run out of memory and will now exit.",
3935 	             get_nodeno(site));
3936 	  terminate_and_exit();
3937 	}
3938 	return(p);
3939 }
3940 
3941 
3942 
3943 /* }}} */
3944 
3945 /* {{{ Acceptor-learner task */
acceptor_learner_task(task_arg arg)3946 int	acceptor_learner_task(task_arg arg)
3947 {
3948 	DECL_ENV
3949 	    connection_descriptor rfd;
3950 	srv_buf *in_buf;
3951 
3952 	pax_msg * p;
3953 	u_int	buflen;
3954 	char	*buf;
3955 	linkage reply_queue;
3956 	int	errors;
3957 	server *srv;
3958 	END_ENV;
3959 
3960 	TASK_BEGIN
3961 
3962 		ep->in_buf = calloc(1, sizeof(srv_buf));
3963 
3964 	ep->rfd.fd = get_int_arg(arg);
3965 #ifdef XCOM_HAVE_OPENSSL
3966 	ep->rfd.ssl_fd = 0;
3967 #endif
3968 	ep->p = NULL;
3969 	ep->buflen = 0;
3970 	ep->buf = NULL;
3971 	ep->errors = 0;
3972 	ep->srv = 0;
3973 
3974 	/* We have a connection, make socket non-blocking and wait for request */
3975 	unblock_fd(ep->rfd.fd);
3976 	set_nodelay(ep->rfd.fd);
3977 	wait_io(stack, ep->rfd.fd, 'r');
3978 	TASK_YIELD;
3979 
3980 #ifdef XCOM_HAVE_OPENSSL
3981 	if (xcom_use_ssl()) {
3982 		ep->rfd.ssl_fd = SSL_new(server_ctx);
3983 		SSL_set_fd(ep->rfd.ssl_fd, ep->rfd.fd);
3984 
3985 		{
3986 			int	ret_ssl;
3987 			int	err;
3988 			ERR_clear_error();
3989 			ret_ssl = SSL_accept(ep->rfd.ssl_fd);
3990 			err = SSL_get_error(ep->rfd.ssl_fd, ret_ssl);
3991 
3992 			while (ret_ssl != SSL_SUCCESS) {
3993 				if (err == SSL_ERROR_WANT_READ){
3994 					wait_io(stack, ep->rfd.fd, 'r');
3995 				} else if (err == SSL_ERROR_WANT_WRITE){
3996 					wait_io(stack, ep->rfd.fd, 'w');
3997 				} else { /* Some other error, give up */
3998 					break;
3999 				}
4000 				TASK_YIELD;
4001 				SET_OS_ERR(0);
4002 				G_DEBUG("acceptor learner accept retry fd %d", ep->rfd.fd);
4003 				ERR_clear_error();
4004 				ret_ssl = SSL_accept(ep->rfd.ssl_fd);
4005 				err = SSL_get_error(ep->rfd.ssl_fd, ret_ssl);
4006 			}
4007 
4008 			if (ret_ssl != SSL_SUCCESS) {
4009 				ssl_free_con(&ep->rfd);
4010 				close_connection(&ep->rfd);
4011 				TERMINATE;
4012 			}
4013 		}
4014 
4015 	} else {
4016 		ep->rfd.ssl_fd = 0;
4017 	}
4018 #endif
4019 	set_connected(&ep->rfd, CON_FD);
4020 	link_init(&ep->reply_queue,  type_hash("msg_link"));
4021 
4022 	while (!xcom_shutdown) {
4023 		int64_t	n;
4024 		site_def const * site = 0;
4025 		unchecked_replace_pax_msg(&ep->p, pax_msg_new_0(null_synode));
4026 
4027 		if(use_buffered_read){
4028 			TASK_CALL(buffered_read_msg(&ep->rfd, ep->in_buf, ep->p, ep->srv, &n));
4029 		}else{
4030 			TASK_CALL(read_msg(&ep->rfd, ep->p, ep->srv, &n));
4031 		}
4032 		if (((int)ep->p->op < (int)client_msg || ep->p->op > LAST_OP)) {
4033 			/* invalid operation, ignore message */
4034 			delete_pax_msg(ep->p);
4035 			ep->p = NULL;
4036 			TASK_YIELD;
4037 			continue;
4038 		}
4039 		if (n <= 0) {
4040 			break;
4041 		}
4042 		site = find_site_def(ep->p->synode);
4043 		/*
4044 			Getting a pointer to the server needs to be done after we have
4045 			received a message, since without having received a message, we
4046 			cannot know who it is from. We could peek at the message and de‐
4047 			serialize the message number and from field, but since the server
4048 			does not change, it should be sufficient to cache the server in
4049 			the acceptor_learner task. A cleaner solution would have been to
4050 			move the timestamps out of the server object, and have a map in‐
4051 			dexed by IP/port or UUID to track the timestamps, since this is
4052 			common to both the sender_task, reply_handler_task,  and the ac‐
4053 			ceptor_learner_task.
4054 		*/
4055 		// Allow the previous server reference to be freed.
4056 		if (ep->srv) srv_unref(ep->srv);
4057 		ep->srv = get_server(site, ep->p->from);
4058 		// Prevent the new server reference from being freed.
4059 		if (ep->srv) srv_ref(ep->srv);
4060 		ep->p->refcnt = 1; /* Refcnt from other end is void here */
4061 		MAY_DBG(FN;
4062 				NDBG(ep->rfd.fd, d); NDBG(task_now(), f);
4063 				COPY_AND_FREE_GOUT(dbg_pax_msg(ep->p));
4064 				);
4065 		receive_count[ep->p->op]++;
4066 		receive_bytes[ep->p->op] += (uint64_t)n + MSG_HDR_SIZE;
4067 		{
4068 			gboolean behind = FALSE;
4069 			if (get_maxnodes(site) > 0) {
4070 				behind = ep->p->synode.msgno < delivered_msg.msgno;
4071 			}
4072 			ADD_EVENTS(
4073 				add_event(string_arg("before dispatch "));
4074 				add_synode_event(ep->p->synode);
4075 				add_event(string_arg("ep->p->from"));
4076 				add_event(int_arg(ep->p->from));
4077 				add_event(string_arg(pax_op_to_str(ep->p->op)));
4078 			);
4079 			if (ep->p->msg_type == normal ||
4080 			    ep->p->synode.msgno == 0 || /* Used by i-am-alive and so on */
4081 				is_cached(ep->p->synode) || /* Already in cache */
4082 				(!behind)) { /* Guard against cache pollution from other nodes */
4083 				dispatch_op(site, ep->p, &ep->reply_queue);
4084 
4085 				/* Send replies on same fd */
4086 				while (!link_empty(&ep->reply_queue)) {
4087 					msg_link * reply = (msg_link * )(link_extract_first(&ep->reply_queue));
4088 					MAY_DBG(FN;
4089 					    COPY_AND_FREE_GOUT(dbg_linkage(&ep->reply_queue));
4090 					    COPY_AND_FREE_GOUT(dbg_msg_link(reply));
4091 					    COPY_AND_FREE_GOUT(dbg_pax_msg(reply->p));
4092 					    );
4093 					assert(reply->p);
4094 					assert(reply->p->refcnt > 0);
4095 					reply->p->to = ep->p->from;
4096 					reply->p->from = ep->p->to;
4097 					reply->p->delivered_msg = get_delivered_msg();
4098 					reply->p->max_synode = get_max_synode();
4099 					serialize_msg(reply->p, ep->rfd.x_proto, &ep->buflen, &ep->buf);
4100 					MAY_DBG(FN; COPY_AND_FREE_GOUT(dbg_msg_link(reply));
4101 							COPY_AND_FREE_GOUT(dbg_pax_msg(reply->p)));
4102 					msg_link_delete(&reply);
4103 					if(ep->buflen){
4104 						int64_t	sent;
4105 						TASK_CALL(task_write(&ep->rfd , ep->buf, ep->buflen, &sent));
4106 						send_count[ep->p->op]++;
4107 						send_bytes[ep->p->op] += ep->buflen;
4108 						X_FREE(ep->buf);
4109 					}
4110 					ep->buf = NULL;
4111 				}
4112 			} else {
4113 				DBGOUT(FN; STRLIT("rejecting ");
4114 					   STRLIT(pax_op_to_str(ep->p->op));
4115 					   NDBG(ep->p->from, d); NDBG(ep->p->to, d);
4116 					   SYCEXP(ep->p->synode);
4117 					   BALCEXP(ep->p->proposal));
4118 				if (xcom_booted() && behind) {
4119 #ifdef USE_EXIT_TYPE
4120 					if (ep->p->op == prepare_op) {
4121 						miss_prepare(ep->p, &ep->reply_queue);
4122 					} else if (ep->p->op == accept_op) {
4123 						miss_accept(ep->p, &ep->reply_queue);
4124 					}
4125 #else
4126 					if (/*ep->p->op == prepare_op && */ was_removed_from_cache(ep->p->synode)) {
4127 						DBGOUT(FN; STRLIT("send_die ");
4128 							   STRLIT(pax_op_to_str(ep->p->op));
4129 							   NDBG(ep->p->from, d); NDBG(ep->p->to, d);
4130 							   SYCEXP(ep->p->synode);
4131 							   BALCEXP(ep->p->proposal));
4132 						if (get_maxnodes(site) > 0) {
4133 							pax_msg * np = NULL;
4134 							np = pax_msg_new(ep->p->synode, site);
4135 							ref_msg(np);
4136 							np->op = die_op;
4137 							np->to = ep->p->from;
4138 							np->from = ep->p->to;
4139 							np->delivered_msg = get_delivered_msg();
4140 							np->max_synode = get_max_synode();
4141 							DBGOUT(FN; STRLIT("sending die_op to node "); NDBG(np->to, d);
4142 								SYCEXP(executed_msg); SYCEXP(max_synode); SYCEXP(np->synode));
4143 							serialize_msg(np, ep->rfd.x_proto, &ep->buflen, &ep->buf);
4144 							if(ep->buflen){
4145 								int64_t	sent;
4146 								TASK_CALL(task_write(&ep->rfd , ep->buf, ep->buflen, &sent));
4147 								send_count[ep->p->op]++;
4148 								send_bytes[ep->p->op] += ep->buflen;
4149 								X_FREE(ep->buf);
4150 							}
4151 							ep->buf = NULL;
4152 							unref_msg(&np);
4153 						}
4154 
4155 					}
4156 #endif
4157 				}
4158 			}
4159 		}
4160 		/* TASK_YIELD; */
4161 	}
4162 
4163 	FINALLY
4164 	    MAY_DBG(FN; STRLIT(" shutdown "); NDBG(ep->rfd.fd, d ); NDBG(task_now(), f));
4165 	if(ep->reply_queue.suc && !link_empty(&ep->reply_queue))
4166 	    empty_msg_list(&ep->reply_queue);
4167 	unchecked_replace_pax_msg(&ep->p, NULL);
4168 	shutdown_connection(&ep->rfd);
4169 	DBGOUT(FN; NDBG(xcom_shutdown, d));
4170 	if (ep->buf)
4171 		X_FREE(ep->buf);
4172 	free(ep->in_buf);
4173 	// Allow the server reference to be freed.
4174 	if (ep->srv) srv_unref(ep->srv);
4175 
4176 	TASK_END;
4177 }
4178 
4179 
4180 /* }}} */
4181 
4182 /* {{{ Reply handler task */
4183 int const need_boot_special = 1;
4184 
4185 static void	server_handle_need_snapshot(server *srv, site_def const *s, node_no node);
4186 
reply_handler_task(task_arg arg)4187 int	reply_handler_task(task_arg arg)
4188 {
4189 	DECL_ENV
4190 	    server * s;
4191 	pax_msg * reply;
4192 	END_ENV;
4193 
4194 	TASK_BEGIN
4195 
4196 	ep->s = (server * )get_void_arg(arg);
4197 	srv_ref(ep->s);
4198 	ep->reply = NULL;
4199 
4200 	for (; ; ) {
4201 		while (!is_connected(&ep->s->con)) {
4202 			MAY_DBG(FN; STRLIT("waiting for connection"));
4203 			TASK_DELAY(1.000);
4204 		}
4205 		{
4206 			int64_t	n;
4207 			unchecked_replace_pax_msg(&ep->reply, pax_msg_new_0( null_synode));
4208 
4209 			ADD_EVENTS(
4210 				add_event(string_arg("ep->s->con.fd"));
4211 				add_event(int_arg(ep->s->con.fd));
4212 			);
4213 			TASK_CALL(read_msg(&ep->s->con, ep->reply, ep->s, &n));
4214 			ADD_EVENTS(
4215 				add_event(string_arg("ep->s->con.fd"));
4216 				add_event(int_arg(ep->s->con.fd));
4217 			);
4218 			ep->reply->refcnt = 1; /* Refcnt from other end is void here */
4219 			if (n <= 0) {
4220 				shutdown_connection(&ep->s->con);
4221 				continue;
4222 			}
4223 			receive_bytes[ep->reply->op] += (uint64_t)n + MSG_HDR_SIZE;
4224 		}
4225 		MAY_DBG(FN;
4226 		    NDBG(ep->s->con.fd, d); NDBG(task_now(), f);
4227 		    COPY_AND_FREE_GOUT(dbg_pax_msg(ep->reply));
4228 		    );
4229 		receive_count[ep->reply->op]++;
4230 
4231 		/* Special test for need_snapshot, since node and site may not be consistent */
4232 		if (need_boot_special && ep->reply->op == need_boot_op) {
4233 			pax_msg * p = ep->reply;
4234 			server_handle_need_snapshot(ep->s, get_site_def(), p->from);
4235 		}else{
4236 			//We only handle messages from this connection is the server is valid.
4237 			if(ep->s->invalid == 0)
4238 				dispatch_op(find_site_def(ep->reply->synode), ep->reply, NULL);
4239 		}
4240 		TASK_YIELD;
4241 	}
4242 
4243 	FINALLY
4244 	    replace_pax_msg(&ep->reply, NULL);
4245 
4246 	shutdown_connection(&ep->s->con);
4247 	ep->s->reply_handler = NULL;
4248 	MAY_DBG(FN; STRLIT(" shutdown "); NDBG(ep->s->con.fd, d); NDBG(task_now(), f));
4249 	srv_unref(ep->s);
4250 
4251 	TASK_END;
4252 }
4253 
4254 
4255 /* }}} */
4256 
4257 
4258 /* purecov: begin deadcode */
xcom_sleep(unsigned int seconds)4259 static inline void xcom_sleep(unsigned int seconds)
4260 {
4261 #if defined (WIN32) || defined (WIN64)
4262  Sleep((DWORD)seconds*1000); /* windows sleep takes milliseconds */
4263 #else
4264  sleep(seconds);
4265 #endif
4266 }
4267 
4268 /* purecov: end */
4269 
4270 /*
4271  * Get a unique long as the basis for XCom group id creation.
4272  *
4273  * NOTE:
4274  * As there is no gethostid() on win, we use seconds since epoch instead,
4275  * so it might fail if you try simultaneous create sites at the same second.
4276  */
get_unique_long(void)4277 long	get_unique_long(void)
4278 {
4279 #if defined(WIN32) || defined(WIN64)
4280 	__time64_t ltime;
4281 
4282 	_time64( &ltime );
4283 	return (long) (ltime ^ GetCurrentProcessId());
4284 #else
4285 	return gethostid() ^ getpid();
4286 #endif
4287 }
4288 
4289 
4290 /* {{{ Coroutine macros */
4291 /*
4292    Coroutine device (or more precisely, a finite state machine, as the
4293    stack is not preserved), described by its inventor Tom Duff as
4294    being "too horrid to go into". The basic idea is that the switch
4295    can be used to jump anywhere in the code, so we note where we are
4296    when we return, and jump there when we enter the routine again by
4297    switching on the state, which is really a line number supplied by
4298    the CO_RETURN macro.
4299 */
4300 
4301 #define CO_BEGIN switch(state){ default: assert(state == 0); case 0:
4302 #define CO_END    }
4303 
4304 #define CO_RETURN(x)                \
4305   {                                             \
4306     state = __LINE__;                          \
4307     return x;                                   \
4308   case __LINE__:;                               \
4309   }
4310 
4311 #define HALT(x) while(1){ CO_RETURN(x);}
4312 
4313 /* purecov: begin deadcode */
send_app_data(app_data_ptr a)4314 void send_app_data(app_data_ptr a)
4315 {
4316 	pax_msg * msg = pax_msg_new(null_synode, get_proposer_site());
4317 	xcom_send(a, msg);
4318 }
4319 
xcom_send_data(uint32_t size,char * data)4320 void	xcom_send_data(uint32_t size, char *data)
4321 {
4322 	app_data_ptr a = new_app_data();
4323 	a->body.c_t = app_type;
4324 	a->body.app_u_u.data.data_len = size;
4325 	a->body.app_u_u.data.data_val = data;
4326 	send_app_data(a);
4327 }
4328 
create_config(node_list * nl,cargo_type type)4329 app_data_ptr create_config(node_list *nl, cargo_type type)
4330 {
4331 	app_data_ptr a = new_app_data();
4332 	a->body.c_t = type;
4333 	init_node_list(nl->node_list_len, nl->node_list_val, &a->body.app_u_u.nodes);
4334 	return a;
4335 }
4336 /* purecov: end */
4337 
init_config_with_group(app_data * a,node_list * nl,cargo_type type,uint32_t group_id)4338 app_data_ptr init_config_with_group(app_data *a, node_list *nl, cargo_type type,
4339                                     uint32_t group_id)
4340 {
4341 	init_app_data(a);
4342 	a->app_key.group_id = a->group_id = group_id;
4343 	a->body.c_t = type;
4344 	init_node_list(nl->node_list_len, nl->node_list_val, &a->body.app_u_u.nodes);
4345 	return a;
4346 }
4347 
4348 /* purecov: begin deadcode */
create_config_with_group(node_list * nl,cargo_type type,uint32_t group_id)4349 app_data_ptr create_config_with_group(node_list *nl, cargo_type type,
4350                                       uint32_t group_id)
4351 {
4352 	app_data_ptr a = new_app_data();
4353 	return init_config_with_group(a, nl, type, group_id);
4354 }
4355 
send_boot(node_list * nl)4356 void send_boot(node_list *nl)
4357 {	app_data_ptr a = create_config(nl, unified_boot_type);
4358 	install_node_group(a); 	/* Cannot get consensus unless group is known */
4359 	send_app_data(a);
4360 }
4361 
send_add_node(node_list * nl)4362 void send_add_node(node_list *nl)
4363 {
4364 	send_app_data(create_config(nl, add_node_type));
4365 }
4366 
send_remove_node(node_list * nl)4367 void send_remove_node(node_list *nl)
4368 {
4369 	send_app_data(create_config(nl, remove_node_type));
4370 }
4371 
send_config(node_list * nl)4372 void send_config(node_list *nl)
4373 {
4374 	send_app_data(create_config(nl, force_config_type));
4375 }
4376 
send_client_app_data(char * srv,xcom_port port,app_data_ptr a)4377 void send_client_app_data(char *srv, xcom_port port, app_data_ptr a)
4378 {
4379 	pax_msg * msg = pax_msg_new(null_synode, 0);
4380 	envelope *e = calloc(1, sizeof(envelope));
4381 
4382 	msg->a = a;
4383 	msg->to = VOID_NODE_NO;
4384 	msg->op = client_msg;
4385 	e->srv = strdup(srv);
4386 	e->port = port;
4387 	e->p = msg;
4388 	e->crash_on_error = 0;
4389 	task_new(client_task, void_arg(e), "client_task", XCOM_THREAD_DEBUG);
4390 }
4391 
send_client_boot(char * srv,xcom_port port,node_list * nl)4392 void send_client_boot(char *srv, xcom_port port, node_list *nl)
4393 {
4394 	send_client_app_data(srv, port, create_config(nl, unified_boot_type));
4395 }
4396 
send_client_add_node(char * srv,xcom_port port,node_list * nl)4397 void send_client_add_node(char *srv, xcom_port port, node_list *nl)
4398 {
4399 	send_client_app_data(srv, port, create_config(nl, add_node_type));
4400 }
4401 
send_client_remove_node(char * srv,xcom_port port,node_list * nl)4402 void send_client_remove_node(char *srv, xcom_port port, node_list *nl)
4403 {
4404 	send_client_app_data(srv, port, create_config(nl, remove_node_type));
4405 }
4406 
send_client_config(char * srv,xcom_port port,node_list * nl)4407 void send_client_config(char *srv, xcom_port port, node_list *nl)
4408 {
4409 	send_client_app_data(srv, port, create_config(nl, force_config_type));
4410 }
4411 /* purecov: end */
4412 
server_send_snapshot(server * srv,site_def const * s,gcs_snapshot * gcs_snap,node_no node)4413 static void	server_send_snapshot(server *srv, site_def const *s, gcs_snapshot *gcs_snap, node_no node)
4414 {
4415 	pax_msg * p = pax_msg_new(gcs_snap->log_start, get_site_def());
4416 	ref_msg(p);
4417 	p->op = gcs_snapshot_op;
4418 	p->gcs_snap = gcs_snap;
4419 	send_msg(srv, s->nodeno, node, get_group_id(s), p);
4420 	unref_msg(&p);
4421 }
4422 
4423 /* purecov: begin deadcode */
send_snapshot(site_def const * s,gcs_snapshot * gcs_snap,node_no node)4424 static void	send_snapshot(site_def const *s, gcs_snapshot *gcs_snap, node_no node)
4425 {
4426 	assert(s->servers[node]);
4427 	server_send_snapshot(s->servers[node], s, gcs_snap, node);
4428 }
4429 /* purecov: end */
4430 
server_push_log(server * srv,synode_no push,node_no node)4431 static void	server_push_log(server *srv, synode_no push, node_no node)
4432 {
4433 	site_def const *s = get_site_def();
4434 	while (!synode_gt(push, get_max_synode())) {
4435 		if (is_cached(push)) {
4436 			pax_machine * p = get_cache_no_touch(push);
4437 			if (pm_finished(p)) {
4438 				/* Need to clone message here since pax_machine may be re-used while message is sent */
4439 				pax_msg * pm = clone_pax_msg(p->learner.msg);
4440 				if (pm != NULL) {
4441 				  ref_msg(pm);
4442 				  pm->op = recover_learn_op;
4443 				  send_msg(srv, s->nodeno, node, get_group_id(s), pm);
4444 				  unref_msg(&pm);
4445 				}
4446 			}
4447 		}
4448 		push = incr_synode(push);
4449 	}
4450 }
4451 
4452 /* purecov: begin deadcode */
push_log(synode_no push,node_no node)4453 static void	push_log(synode_no push, node_no node)
4454 {
4455 	site_def const * s = get_site_def();
4456 	assert(s->servers[node]);
4457 	server_push_log(s->servers[node], push, node);
4458 }
4459 /* purecov: end */
4460 
4461 static app_snap_getter get_app_snap;
4462 static app_snap_handler handle_app_snap;
4463 
4464 /* purecov: begin deadcode */
handle_need_snapshot(site_def const * s,node_no node)4465 static void	handle_need_snapshot(site_def const *s, node_no node)
4466 {
4467 	gcs_snapshot * gs = export_config();
4468 	synode_no app_lsn = get_app_snap(&gs->app_snap);
4469 	if (!synode_eq(null_synode, app_lsn) && synode_lt(app_lsn, gs->log_start))
4470 		gs->log_start = app_lsn;
4471 	send_snapshot(s, gs, node);
4472 	push_log(gs->log_start, node);
4473 }
4474 /* purecov: end */
4475 
server_handle_need_snapshot(server * srv,site_def const * s,node_no node)4476 static void	server_handle_need_snapshot(server *srv, site_def const *s, node_no node)
4477 {
4478 	gcs_snapshot * gs = export_config();
4479 	synode_no app_lsn = get_app_snap(&gs->app_snap);
4480 	if (!synode_eq(null_synode, app_lsn) && synode_lt(app_lsn, gs->log_start)){
4481 		gs->log_start = app_lsn;
4482 	}
4483 	else if (!synode_eq(null_synode, last_config_modification_id)) {
4484 		gs->log_start = last_config_modification_id;
4485 	}
4486 
4487 	server_send_snapshot(srv, s, gs, node);
4488 	server_push_log(srv, gs->log_start, node);
4489 }
4490 
4491 #define X(b) #b,
4492 const char *xcom_state_name[] = {
4493 	x_state_list
4494 };
4495 
4496 const char *xcom_actions_name[] = {
4497 	x_actions
4498 };
4499 #undef X
4500 
xcom_fsm(xcom_actions action,task_arg fsmargs)4501 xcom_state xcom_fsm(xcom_actions action, task_arg fsmargs)
4502 {
4503 	static int	state = 0;
4504 	G_DEBUG("state %d action %s", state, xcom_actions_name[action]);
4505 	switch (state) {
4506 	default:
4507 		assert(state == 0);
4508 	case 0:
4509 		/* Initialize basic xcom data */
4510 		xcom_thread_init();
4511 start:
4512 		for (; ; ) {
4513 			if (action == xa_init) {
4514 				xcom_shutdown = 0;
4515 				sent_alive = 0.0;
4516 				oom_abort = 0;
4517 			}
4518 			if (action == xa_u_boot) {
4519 /* purecov: begin deadcode */
4520 				node_list * nl = get_void_arg(fsmargs);
4521 				app_data_ptr a = create_config(nl, unified_boot_type);
4522 				install_node_group(a); 	/* Cannot get consensus unless group is known */
4523 				send_app_data(a);
4524 				set_executed_msg(incr_msgno(get_site_def()->start));
4525 				goto run;
4526 /* purecov: end */
4527 			}
4528 			if (action == xa_add) {
4529 /* purecov: begin deadcode */
4530 				add_args * a = get_void_arg(fsmargs);
4531 				send_client_add_node(a->addr, a->port, a->nl);
4532 /* purecov: end */
4533 			}
4534 			if (action == xa_net_boot) {
4535 				app_data * a = get_void_arg(fsmargs);
4536 				install_node_group(a);
4537 				set_executed_msg(incr_msgno(get_site_def()->start));
4538 				goto run;
4539 			}
4540 			if (action == xa_snapshot) {
4541 				goto recover;
4542 			}
4543 			if (action == xa_exit) {
4544 				/* Xcom is finished when we get here */
4545 				bury_site(get_group_id(get_site_def()));
4546 				task_terminate_all();    /* Kill, kill, kill, kill, kill, kill. This is the end. */
4547 
4548 				init_xcom_base();        /* Reset shared variables */
4549 				init_tasks();            /* Reset task variables */
4550 				free_site_defs();
4551 				free_forced_config_site_def();
4552 				garbage_collect_servers();
4553 				DBGOUT(FN; STRLIT("shutting down"));
4554 				xcom_shutdown = 1;
4555 				if(xcom_exit_cb)
4556 					xcom_exit_cb(get_int_arg(fsmargs));
4557 				G_DEBUG("Exiting xcom thread");
4558 			}
4559 			CO_RETURN(x_start);
4560 		}
4561 recover:
4562 		 {
4563 			pax_msg * p = get_void_arg(fsmargs);
4564 			import_config(p->gcs_snap);
4565 			handle_app_snap(&p->gcs_snap->app_snap);
4566 			set_executed_msg(p->gcs_snap->log_start);
4567 
4568 			set_last_received_config(p->gcs_snap->log_start);
4569 
4570 			DBGOUT(FN; SYCEXP(executed_msg); );
4571 			for (; ; ) {
4572 				if (action == xa_terminate) {
4573 					goto start;
4574 				}
4575 				if (action == xa_complete) {
4576 					goto run;
4577 				}
4578 
4579 				CO_RETURN(x_recover);
4580 			}
4581 		}
4582 run:
4583 		DBGOUT(FN; SYCEXP(executed_msg); );
4584 		if(xcom_run_cb)
4585 			xcom_run_cb(0);
4586 		force_recover = 0;
4587 		client_boot_done = 1;
4588 		netboot_ok = 1;
4589 		booting = 0;
4590 		set_proposer_startpoint();
4591 		create_proposers();
4592 		set_task(&executor, task_new(executor_task, null_arg, "executor_task", XCOM_THREAD_DEBUG));
4593 		set_task(&sweeper, task_new(sweeper_task, null_arg, "sweeper_task", XCOM_THREAD_DEBUG));
4594 		set_task(&detector, task_new(detector_task, null_arg, "detector_task", XCOM_THREAD_DEBUG));
4595 		set_task(&alive_t, task_new(alive_task, null_arg, "alive_task", XCOM_THREAD_DEBUG));
4596 
4597 		for (; ; ) {
4598 			if (action == xa_terminate) {
4599 				force_recover = 0;
4600 				client_boot_done = 0;
4601 				netboot_ok = 0;
4602 				booting = 0;
4603 				oom_abort = 0;
4604 				terminate_proposers();
4605 				init_proposers();
4606 				task_terminate(executor);
4607 				set_task(&executor, NULL);
4608 				task_terminate(sweeper);
4609 				set_task(&sweeper, NULL);
4610 				task_terminate(detector);
4611 				set_task(&detector, NULL);
4612 				task_terminate(alive_t);
4613 				set_task(&alive_t, NULL);
4614 
4615 				init_xcom_base();             /* Reset shared variables */
4616 				free_site_defs();
4617 				free_forced_config_site_def();
4618 				garbage_collect_servers();
4619 				if(xcom_terminate_cb)
4620 					xcom_terminate_cb(get_int_arg(fsmargs));
4621 				goto start;
4622 			}
4623 			if (action == xa_need_snapshot) {
4624 				pax_msg * p = get_void_arg(fsmargs);
4625 				handle_need_snapshot(find_site_def(p->synode), p->from);
4626 			}
4627 			if (action == xa_force_config) {
4628 				app_data * a = get_void_arg(fsmargs);
4629 				site_def *s = create_site_def_with_start(a, executed_msg);
4630 				s->boot_key = executed_msg;
4631 				invalidate_servers(get_site_def(), s);
4632 				start_force_config(s);
4633 			}
4634 			CO_RETURN(x_run);
4635 		}
4636 	}
4637 }
4638 
4639 /* purecov: begin deadcode */
xcom_add_node(char * addr,xcom_port port,node_list * nl)4640 void xcom_add_node(char *addr, xcom_port port, node_list *nl)
4641 {
4642 	if (xcom_mynode_match(addr, port)) {
4643 		XCOM_FSM(xa_u_boot, void_arg(nl)); /* Boot */
4644 	} else {
4645 		add_args a;
4646 		a.addr = addr;
4647 		a.port = port;
4648 		a.nl = nl;
4649 		XCOM_FSM(xa_add, void_arg(&a)); /* Only initialize xcom */
4650 	}
4651 }
4652 
4653 
xcom_fsm_add_node(char * addr,node_list * nl)4654 void xcom_fsm_add_node(char *addr, node_list *nl)
4655 {
4656 	xcom_port	node_port = xcom_get_port(addr);
4657 	char	*node_addr = xcom_get_name(addr);
4658 
4659 	if (xcom_mynode_match(node_addr, node_port)) {
4660 		node_list x_nl;
4661 		x_nl.node_list_len = 1;
4662 		x_nl.node_list_val = new_node_address(x_nl.node_list_len, &addr);
4663 		XCOM_FSM(xa_u_boot, void_arg(&x_nl));
4664 		delete_node_address(x_nl.node_list_len, x_nl.node_list_val);
4665 	} else {
4666 		add_args a;
4667 		a.addr = node_addr;
4668 		a.port = node_port;
4669 		a.nl = nl;
4670 		XCOM_FSM(xa_add, void_arg(&a));
4671 	}
4672 	free(node_addr);
4673 }
4674 /* purecov: end */
4675 
set_app_snap_handler(app_snap_handler x)4676 void set_app_snap_handler(app_snap_handler x)
4677 {
4678 	handle_app_snap = x;
4679 }
4680 
set_app_snap_getter(app_snap_getter x)4681 void set_app_snap_getter(app_snap_getter x)
4682 {
4683 	get_app_snap = x;
4684 }
4685 
4686 /* Initialize sockaddr based on server and port */
init_sockaddr(char * server,struct sockaddr_in * sock_addr,socklen_t * sock_size,xcom_port port)4687 static int	init_sockaddr(char *server, struct sockaddr_in *sock_addr,
4688                           socklen_t *sock_size, xcom_port port)
4689 {
4690 	/* Get address of server */
4691 	struct addrinfo *addr = 0;
4692 
4693 	checked_getaddrinfo(server, 0, 0, &addr);
4694 
4695 	if (addr == 0) {
4696 		return 0;
4697 	}
4698 
4699 	/* Copy first address */
4700 	memcpy(sock_addr, addr->ai_addr, addr->ai_addrlen);
4701 	*sock_size = addr->ai_addrlen;
4702 	sock_addr->sin_port = htons(port);
4703 	freeaddrinfo(addr);
4704 
4705 	return 1;
4706 }
4707 
4708 
checked_create_socket(int domain,int type,int protocol)4709 static result	checked_create_socket(int domain, int type, int protocol)
4710 {
4711 	result	retval = {0,0};
4712 	int	retry = 1000;
4713 
4714 	do {
4715 		SET_OS_ERR(0);
4716 		retval.val = socket(domain, type, protocol);
4717 		retval.funerr = to_errno(GET_OS_ERR);
4718 	} while (--retry && retval.val == -1 && (from_errno(retval.funerr) == SOCK_EAGAIN));
4719 
4720 	if (retval.val == -1) {
4721 		task_dump_err(retval.funerr);
4722 #if defined (WIN32) || defined (WIN64)
4723 		G_MESSAGE("Socket creation failed with error %d.",
4724 			retval.funerr);
4725 #else
4726 		G_MESSAGE("Socket creation failed with error %d - %s.",
4727 			retval.funerr, strerror(retval.funerr));
4728 #endif
4729 		abort();
4730 	}
4731 	return retval;
4732 }
4733 
4734 /* Read max n bytes from socket fd into buffer buf */
socket_read(connection_descriptor * rfd,void * buf,int n)4735 static result	socket_read(connection_descriptor*rfd, void *buf, int n)
4736 {
4737 	result 	ret = {0,0};
4738 
4739 	assert(n >= 0);
4740 
4741 	do {
4742 	  ret = con_read(rfd, buf, n);
4743 	  task_dump_err(ret.funerr);
4744 	} while (ret.val < 0 && can_retry_read(ret.funerr));
4745 	assert(!can_retry_read(ret.funerr));
4746 	return ret;
4747 }
4748 
4749 
4750 /* Read exactly n bytes from socket fd into buffer buf */
socket_read_bytes(connection_descriptor * rfd,char * p,uint32_t n)4751 static int64_t	socket_read_bytes(connection_descriptor *rfd, char *p, uint32_t n)
4752 {
4753 	uint32_t	left= n;
4754 	char	*bytes= p;
4755 
4756 	result	nread = {0,0};
4757 
4758 	while (left > 0) {
4759 		/*
4760 		  socket_read just reads no more than INT_MAX bytes. We should not pass
4761 		  a length more than INT_MAX to it.
4762 		*/
4763 		int r = (int) MIN(left, INT_MAX);
4764 
4765 		nread = socket_read(rfd, bytes, r);
4766 		if (nread.val == 0) {
4767 			return 0;
4768 		} else if (nread.val < 0) {
4769 			return - 1;
4770 		} else {
4771 			bytes += nread.val;
4772 			left -= (uint32_t)nread.val;
4773 		}
4774 	}
4775 	assert(left == 0);
4776 	return n;
4777 }
4778 
4779 /* Write n bytes from buffer buf to socket fd */
socket_write(connection_descriptor * wfd,void * _buf,uint32_t n)4780 static int64_t	socket_write(connection_descriptor *wfd, void *_buf, uint32_t n)
4781 {
4782 	char	*buf = (char*) _buf;
4783 	result 	ret = {0,0};
4784 
4785 	uint32_t	total; /* Keeps track of number of bytes written so far */
4786 
4787 	total = 0;
4788 	while (total < n) {
4789 		int w= (int) MIN(n - total, INT_MAX);
4790 
4791 		while ((ret = con_write(wfd, buf + total, w)).val < 0 &&
4792 			   can_retry_write(ret.funerr)) {
4793 			task_dump_err(ret.funerr);
4794 			DBGOUT(FN; STRLIT("retry "); NEXP(total, d); NEXP(n, d));
4795 		}
4796 		if (ret.val <= 0) { /* Something went wrong */
4797 			task_dump_err(ret.funerr);
4798 			return - 1;
4799 		} else {
4800 			total += (uint32_t)ret.val; /* Add number of bytes written to total */
4801 		}
4802 	}
4803 	DBGOUT(FN; NEXP(total, u); NEXP(n, u));
4804 	assert(total == n);
4805 	return(total);
4806 }
4807 
xcom_close_socket(int * sock)4808 static inline result xcom_close_socket(int *sock)
4809 {
4810 	result res = {0,0};
4811 	if (*sock != -1) {
4812 		do {
4813 			SET_OS_ERR(0);
4814 			res.val = CLOSESOCKET(*sock);
4815 			res.funerr = to_errno(GET_OS_ERR);
4816 		} while (res.val == -1 && from_errno(res.funerr) == SOCK_EINTR);
4817 		*sock = -1;
4818 	}
4819 	return res;
4820 }
4821 
xcom_shut_close_socket(int * sock)4822 static inline result xcom_shut_close_socket(int *sock)
4823 {
4824 	result res = {0,0};
4825 	if (*sock >= 0) {
4826 #if defined (WIN32) || defined (WIN64)
4827                static LPFN_DISCONNECTEX DisconnectEx = NULL;
4828                if (DisconnectEx == NULL)
4829                {
4830                        DWORD dwBytesReturned;
4831                        GUID guidDisconnectEx = WSAID_DISCONNECTEX;
4832                        WSAIoctl(*sock, SIO_GET_EXTENSION_FUNCTION_POINTER,
4833                                &guidDisconnectEx, sizeof(GUID),
4834                                &DisconnectEx, sizeof(DisconnectEx),
4835                                &dwBytesReturned, NULL, NULL);
4836                }
4837                if (DisconnectEx != NULL)
4838                {
4839                        (DisconnectEx(*sock, (LPOVERLAPPED)NULL,
4840                                (DWORD)0, (DWORD)0) == TRUE) ? 0 : -1;
4841                }
4842                else
4843 #endif
4844 		shutdown(*sock, _SHUT_RDWR);
4845 		res = xcom_close_socket(sock);
4846 	}
4847 	return res;
4848 }
4849 
4850 #define CONNECT_FAIL ret_fd = -1; goto end
4851 
timed_connect(int fd,sockaddr * sock_addr,socklen_t sock_size)4852 static int timed_connect(int fd, sockaddr *sock_addr, socklen_t sock_size)
4853 {
4854   int timeout = 10000;
4855   int ret_fd = fd;
4856   int syserr;
4857   int sysret;
4858   struct pollfd fds;
4859 #ifdef WITH_LOG_DEBUG
4860   char buf[SYS_STRERROR_SIZE];
4861 #endif
4862 
4863   fds.fd = fd;
4864   fds.events = POLLOUT;
4865   fds.revents = 0;
4866 
4867   /* Set non-blocking */
4868   if (unblock_fd(fd) < 0)
4869     return -1;
4870 
4871   /* Trying to connect with timeout */
4872   SET_OS_ERR(0);
4873   sysret = connect(fd, sock_addr, sock_size);
4874 
4875   if (is_socket_error(sysret)) {
4876     syserr = GET_OS_ERR;
4877     /* If the error is SOCK_EWOULDBLOCK or SOCK_EINPROGRESS or SOCK_EALREADY,
4878      * wait. */
4879     switch (syserr) {
4880       case SOCK_EWOULDBLOCK:
4881       case SOCK_EINPROGRESS:
4882       case SOCK_EALREADY:
4883         break;
4884       default:
4885         G_DEBUG("connect - Error connecting (socket=%d, error=%d).",
4886                 fd, syserr);
4887         CONNECT_FAIL;
4888     }
4889 
4890     SET_OS_ERR(0);
4891     while ((sysret = poll(&fds, 1, timeout)) < 0) {
4892       syserr = GET_OS_ERR;
4893       if (syserr != SOCK_EINTR && syserr != SOCK_EINPROGRESS) break;
4894       SET_OS_ERR(0);
4895     }
4896     MAY_DBG(FN; STRLIT("poll - Finished. "); NEXP(sysret, d));
4897 
4898     if (sysret == 0) {
4899       G_DEBUG("Timed out while waiting for connection to be established! "
4900               "Canceling connection attempt. (socket= %d, error=%d)",
4901               fd, sysret);
4902       CONNECT_FAIL;
4903     }
4904 
4905     if (is_socket_error(sysret)) {
4906       G_DEBUG("poll - Error while connecting! (socket= %d, error=%d)",
4907               fd, syserr);
4908       CONNECT_FAIL;
4909     }
4910 
4911     {
4912       int socket_errno = 0;
4913       socklen_t socket_errno_len = sizeof(socket_errno);
4914 
4915       if ((fds.revents & POLLOUT) == 0) {
4916         MAY_DBG(FN; STRLIT("POLLOUT not set - Socket failure!"););
4917         ret_fd = -1;
4918       }
4919 
4920       if (fds.revents & (POLLERR | POLLHUP | POLLNVAL)) {
4921         MAY_DBG(FN;
4922                 STRLIT("POLLERR | POLLHUP | POLLNVAL set - Socket failure!"););
4923         ret_fd = -1;
4924       }
4925 
4926       if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &socket_errno,
4927                      &socket_errno_len) != 0) {
4928         G_DEBUG("getsockopt socket %d failed.", fd);
4929         ret_fd = -1;
4930       } else {
4931         if (socket_errno != 0) {
4932           G_DEBUG("Connection to socket %d failed with error %d - %s.", fd,
4933                   socket_errno, strerr_msg(buf, sizeof(buf), socket_errno));
4934           ret_fd = -1;
4935         }
4936       }
4937     }
4938   }
4939 
4940 end:
4941   /* Set blocking */
4942   SET_OS_ERR(0);
4943   if(block_fd(fd) < 0) {
4944     G_DEBUG(
4945         "Unable to set socket back to blocking state. (socket=%d, error=%d).",
4946         fd, GET_OS_ERR);
4947     return -1;
4948   }
4949 
4950   return ret_fd;
4951 }
4952 
4953 
4954 /* Connect to server on given port */
connect_xcom(char * server,xcom_port port)4955 static connection_descriptor*	connect_xcom(char *server, xcom_port port)
4956 {
4957 	result fd = {0,0};
4958 	result ret = {0,0};
4959 	struct sockaddr_in sock_addr;
4960 	socklen_t sock_size;
4961 #ifdef WITH_LOG_DEBUG
4962         char buf[SYS_STRERROR_SIZE];
4963 #endif
4964 
4965 	DBGOUT(FN; STREXP(server); NEXP(port, d));
4966 	G_DEBUG("connecting to %s %d", server, port);
4967 	/* Create socket */
4968 	if ((fd = checked_create_socket(AF_INET, SOCK_STREAM, 0)).val < 0) {
4969 		G_DEBUG("Error creating sockets.");
4970 		return NULL;
4971 	}
4972 
4973 	/* Get address of server */
4974 	if (!init_sockaddr(server, &sock_addr, &sock_size, port)) {
4975 		xcom_close_socket(&fd.val);
4976 		G_DEBUG("Error initializing socket addresses.");
4977 		return NULL;
4978 	}
4979 
4980 	/* Connect socket to address */
4981 
4982 	SET_OS_ERR(0);
4983 	if (timed_connect(fd.val, (struct sockaddr *)&sock_addr, sock_size) == -1) {
4984 		fd.funerr = to_errno(GET_OS_ERR);
4985 		G_DEBUG("Connecting socket to address %s in port %d failed with error %d - %s.",
4986 				server, port, fd.funerr, strerr_msg(buf, sizeof(buf), fd.funerr));
4987 		xcom_close_socket(&fd.val);
4988 		return NULL;
4989 	}
4990 
4991 	{
4992 		int	peer = 0;
4993 		/* Sanity check before return */
4994 		SET_OS_ERR(0);
4995 		ret.val = peer = getpeername(fd.val, (struct sockaddr *)&sock_addr,
4996 		                             &sock_size);
4997 		ret.funerr = to_errno(GET_OS_ERR);
4998 		if (peer >= 0) {
4999 			ret = set_nodelay(fd.val);
5000 			if(ret.val < 0){
5001 				task_dump_err(ret.funerr);
5002 				xcom_shut_close_socket(&fd.val);
5003 #if defined (WIN32) || defined (WIN64)
5004 				G_DEBUG("Setting node delay failed while connecting to %s with error %d.",
5005 						server, ret.funerr);
5006 #else
5007 				G_DEBUG("Setting node delay failed while connecting to %s with error %d - %s.",
5008 						server, ret.funerr, strerror(ret.funerr));
5009 #endif
5010 				return NULL;
5011 			}
5012 			G_DEBUG("client connected to %s %d fd %d", server, port, fd.val);
5013 		} else {
5014 			/* Something is wrong */
5015 			socklen_t errlen = sizeof(ret.funerr);
5016 			DBGOUT(FN; STRLIT("getpeername failed"); );
5017 			if (ret.funerr) {
5018 				DBGOUT(FN; NEXP(from_errno(ret.funerr), d);
5019 					   STRLIT(strerror(from_errno(ret.funerr))));
5020 			}
5021 			getsockopt(fd.val, SOL_SOCKET, SO_ERROR, (void *) & ret.funerr, &errlen);
5022 			if (ret.funerr == 0) {
5023 				ret.funerr = to_errno(SOCK_ECONNREFUSED);
5024 			}
5025 			xcom_shut_close_socket(&fd.val);
5026 #if defined (WIN32) || defined (WIN64)
5027 			G_DEBUG("Getting the peer name failed while connecting to server %s with error %d.",
5028 					server, ret.funerr);
5029 #else
5030 			G_DEBUG("Getting the peer name failed while connecting to server %s with error %d -%s.",
5031 					server, ret.funerr, strerror(ret.funerr));
5032 #endif
5033 			return NULL;
5034 		}
5035 
5036 #ifdef XCOM_HAVE_OPENSSL
5037 		if (xcom_use_ssl()) {
5038 			connection_descriptor *cd = 0;
5039 			SSL * ssl = SSL_new(client_ctx);
5040 			G_DEBUG("Trying to connect using SSL.")
5041 			SSL_set_fd(ssl, fd.val);
5042 
5043 			ERR_clear_error();
5044 			ret.val = SSL_connect(ssl);
5045 			ret.funerr = to_ssl_err(SSL_get_error(ssl, ret.val));
5046 
5047 			if (ret.val != SSL_SUCCESS) {
5048 				G_MESSAGE("Error connecting using SSL %d %d.",
5049 					  ret.funerr, SSL_get_error(ssl, ret.val));
5050 				task_dump_err(ret.funerr);
5051 				SSL_shutdown(ssl);
5052 				SSL_free(ssl);
5053 				xcom_shut_close_socket(&fd.val);
5054 				return NULL;
5055 			}
5056 			DBGOUT(FN; STRLIT("ssl connected to "); STRLIT(server); NDBG(port,d); NDBG(fd.val, d); PTREXP(ssl));
5057 
5058 			if (ssl_verify_server_cert(ssl, server))
5059 			{
5060 				G_MESSAGE("Error validating certificate and peer.");
5061 				task_dump_err(ret.funerr);
5062 				SSL_shutdown(ssl);
5063 				SSL_free(ssl);
5064 				xcom_shut_close_socket(&fd.val);
5065 				return NULL;
5066 			}
5067 
5068 			cd = new_connection(fd.val, ssl);
5069 			set_connected(cd, CON_FD);
5070 			G_DEBUG("Success connecting using SSL.")
5071 			return cd;
5072 		} else {
5073 			connection_descriptor *cd = new_connection(fd.val, 0);
5074 			set_connected(cd, CON_FD);
5075 			return cd;
5076 		}
5077 #else
5078 		{
5079 			connection_descriptor *cd = new_connection(fd.val);
5080 			set_connected(cd, CON_FD);
5081 			return cd;
5082 		}
5083 #endif
5084 	}
5085 }
5086 
xcom_open_client_connection(char * server,xcom_port port)5087 connection_descriptor*	xcom_open_client_connection(char *server, xcom_port port)
5088 {
5089 	return connect_xcom(server, port);
5090 }
5091 
5092 /* Send a protocol negotiation message on connection con */
xcom_send_proto(connection_descriptor * con,xcom_proto x_proto,x_msg_type x_type,unsigned int tag)5093 static int	xcom_send_proto(connection_descriptor *con, xcom_proto x_proto, x_msg_type x_type, unsigned int tag)
5094 {
5095 	char	buf[MSG_HDR_SIZE];
5096         memset(buf, 0, MSG_HDR_SIZE);
5097 
5098 	if (con->fd >= 0) {
5099 		con->snd_tag = tag;
5100 		write_protoversion(VERS_PTR((unsigned char*) buf), x_proto);
5101 		put_header_1_0((unsigned char*) buf, 0, x_type, tag);
5102 		{
5103 			int	sent;
5104 			sent = (int)socket_write(con, buf, MSG_HDR_SIZE);
5105 			if (con->fd < 0) {
5106 				return -1;
5107 			}
5108 			return sent;
5109 		}
5110 	} else {
5111 		return -1;
5112 	}
5113 }
5114 
xcom_recv_proto(connection_descriptor * rfd,xcom_proto * x_proto,x_msg_type * x_type,unsigned int * tag)5115 static int	xcom_recv_proto(connection_descriptor * rfd, xcom_proto *x_proto, x_msg_type *x_type, unsigned int *tag)
5116 {
5117 	int	n;
5118 	unsigned char	header_buf[MSG_HDR_SIZE];
5119 	uint32_t	msgsize;
5120 
5121 	/* Read length field, protocol version, and checksum */
5122 	n = (int)socket_read_bytes(rfd, (char*)header_buf, MSG_HDR_SIZE);
5123 
5124 	if (n != MSG_HDR_SIZE) {
5125 		DBGOUT(FN; NDBG(n, d));
5126 		return -1;
5127 	}
5128 
5129 	*x_proto = read_protoversion(VERS_PTR(header_buf));
5130 	get_header_1_0(header_buf, &msgsize, x_type, tag);
5131 
5132 	return n;
5133 }
5134 
5135 #define TAG_START 313
5136 
xcom_send_client_app_data(connection_descriptor * fd,app_data_ptr a,int force)5137 static int64_t	xcom_send_client_app_data(connection_descriptor *fd, app_data_ptr a, int force)
5138 {
5139 	pax_msg * msg = pax_msg_new(null_synode, 0);
5140 	uint32_t	buflen = 0;
5141 	char	*buf = 0;
5142 	int64_t retval= 0;
5143 
5144 	if(! proto_done(fd)){
5145 		xcom_proto x_proto;
5146 		x_msg_type x_type;
5147 		unsigned int tag;
5148 		retval = xcom_send_proto(fd, my_xcom_version, x_version_req, TAG_START);
5149 		G_DEBUG("client sent negotiation request for protocol %d",my_xcom_version);
5150 		if(retval < 0)
5151 			goto end;
5152 		retval = xcom_recv_proto(fd, &x_proto, &x_type, &tag);
5153 		if(retval < 0)
5154 			goto end;
5155 		if(tag != TAG_START)
5156 		{
5157 			retval = -1;
5158 			goto end;
5159 		}
5160 		if(x_type != x_version_reply)
5161 		{
5162 			retval = -1;
5163 			goto end;
5164 		}
5165 
5166 		if(x_proto == x_unknown_proto){
5167 			G_DEBUG("no common protocol, returning error");
5168 			retval = -1;
5169 			goto end;
5170 		}
5171 		G_DEBUG("client connection will use protocol version %d",x_proto);
5172 		DBGOUT(STRLIT("client connection will use protocol version ");
5173 			   NDBG(x_proto,u); STRLIT(xcom_proto_to_str(x_proto)));
5174 		fd->x_proto = x_proto;
5175 		set_connected(fd, CON_PROTO);
5176 	}
5177 	msg->a = a;
5178 	msg->to = VOID_NODE_NO;
5179 	msg->op = client_msg;
5180 	msg->force_delivery = force;
5181 
5182 	serialize_msg(msg, fd->x_proto, &buflen, &buf);
5183 	if(buflen){
5184 		retval = socket_write(fd, buf, buflen);
5185 		if (buflen != retval) {
5186 			DBGOUT(FN; STRLIT("write failed "); NDBG(fd->fd, d);
5187 				   NDBG(buflen, d); NDBG(retval, d));
5188 		}
5189 		X_FREE(buf);
5190 	}
5191 end:
5192 	msg->a = 0; /* Do not deallocate a */
5193 	XCOM_XDR_FREE(xdr_pax_msg, msg);
5194 	return retval;
5195 }
5196 
xcom_client_send_data(uint32_t size,char * data,connection_descriptor * fd)5197 int64_t	xcom_client_send_data(uint32_t size, char *data, connection_descriptor *fd)
5198 {
5199 	app_data a;
5200 	int64_t retval = 0;
5201 	init_app_data(&a);
5202 	a.body.c_t = app_type;
5203 	a.body.app_u_u.data.data_len = size;
5204 	a.body.app_u_u.data.data_val = data;
5205 	retval = xcom_send_client_app_data(fd, &a, 0);
5206 	my_xdr_free((xdrproc_t) xdr_app_data, (char*)&a);
5207 	return retval;
5208 }
5209 
socket_read_msg(connection_descriptor * rfd,pax_msg * p)5210 static pax_msg *	socket_read_msg(connection_descriptor *rfd, pax_msg *p)
5211 /* Should buffer reads as well */
5212 {
5213 	int64_t	n;
5214 	char	*bytes;
5215 	unsigned char header_buf[MSG_HDR_SIZE];
5216 	xcom_proto x_version;
5217 	uint32_t	msgsize;
5218 	x_msg_type x_type;
5219 	unsigned int tag;
5220 	int deserialize_ok = 0;
5221 
5222 	bytes = NULL;
5223 
5224 	/* Read version, length, type, and tag */
5225 	n = socket_read_bytes(rfd, (char*)header_buf, MSG_HDR_SIZE);
5226 
5227 	if (n <= 0) {
5228 		DBGOUT(FN; NDBG(n, ll));
5229 		return 0;
5230 	}
5231 	assert(n == MSG_HDR_SIZE);
5232 	x_version = get_32(VERS_PTR(header_buf));
5233 	/* Check the protocol version before doing anything else */
5234 #ifdef XCOM_PARANOID
5235 	assert(check_protoversion(x_version, rfd->x_proto));
5236 #endif
5237 	if (!check_protoversion(x_version, rfd->x_proto)) {
5238 		return 0;
5239 	}
5240 
5241 	/* OK, we can grok this version */
5242 
5243 	get_header_1_0(header_buf, &msgsize, & x_type, &tag);
5244 
5245 	/* Allocate buffer space for message */
5246 	bytes = calloc(1, msgsize);
5247 
5248 	/* Read message */
5249 	n = socket_read_bytes(rfd, bytes, msgsize);
5250 
5251 	if (n > 0) {
5252 		/* Deserialize message */
5253 		deserialize_ok = deserialize_msg(p, rfd->x_proto, bytes, msgsize);
5254 		MAY_DBG(FN; STRLIT(" deserialized message"));
5255 	}
5256 	/* Deallocate buffer */
5257 	X_FREE(bytes);
5258 	if (n <= 0 || deserialize_ok == 0) {
5259 		DBGOUT(FN; NDBG(n, ll));
5260 		return 0;
5261 	}
5262 	return(p);
5263 }
5264 
xcom_close_client_connection(connection_descriptor * connection)5265 int	xcom_close_client_connection(connection_descriptor *connection)
5266 {
5267 	int	retval = 0;
5268 
5269 #ifdef XCOM_HAVE_OPENSSL
5270 	if (connection->ssl_fd) {
5271 		SSL_shutdown(connection->ssl_fd);
5272 		ssl_free_con(connection);
5273 	}
5274 #endif
5275 	retval =  xcom_shut_close_socket(&connection->fd).val;
5276 	free(connection);
5277 	return retval;
5278 }
5279 
xcom_client_boot(connection_descriptor * fd,node_list * nl,uint32_t group_id)5280 int	xcom_client_boot(connection_descriptor *fd, node_list *nl, uint32_t group_id)
5281 {
5282 	app_data a;
5283 	int retval = 0;
5284 	retval =  (int)xcom_send_client_app_data(fd, init_config_with_group(&a, nl, unified_boot_type, group_id), 0);
5285 	my_xdr_free((xdrproc_t) xdr_app_data, (char*)&a);
5286 	return retval;
5287 }
5288 
xcom_send_app_wait(connection_descriptor * fd,app_data * a,int force)5289 int xcom_send_app_wait(connection_descriptor *fd, app_data *a, int force)
5290 {
5291 	int retval = 0;
5292 	int retry_count = 10; // Same as 'connection_attempts'
5293 	pax_msg p;
5294 	pax_msg *rp = 0;
5295 
5296 	do {
5297 		retval = (int)xcom_send_client_app_data(fd, a, force);
5298 		if(retval < 0)
5299 			return 0;
5300 		memset(&p, 0, sizeof(p));
5301 		rp = socket_read_msg(fd, &p);
5302 		if(rp){
5303 			client_reply_code cli_err = rp->cli_err;
5304 			my_xdr_free((xdrproc_t)xdr_pax_msg, (char*)&p);
5305 			switch(cli_err){
5306 				case REQUEST_OK:
5307 					return 1;
5308 				case REQUEST_FAIL:
5309                                         G_DEBUG("cli_err %d",cli_err);
5310 					return 0;
5311 				case REQUEST_RETRY:
5312 			                G_DEBUG("cli_err %d",cli_err);
5313 					xcom_sleep(1);
5314 					break;
5315 				default:
5316 					G_WARNING("client protocol botched");
5317 					return 0;
5318 			}
5319 		}else{
5320 			G_WARNING("read failed");
5321 			return 0;
5322 		}
5323 	} while (--retry_count);
5324 	// Timeout after REQUEST_RETRY has been received 'retry_count' times
5325 	G_MESSAGE(
5326 	 "Request failed: maximum number of retries (10) has been exhausted.");
5327 	return 0;
5328 }
5329 
xcom_send_cfg_wait(connection_descriptor * fd,node_list * nl,uint32_t group_id,cargo_type ct,int force)5330 int xcom_send_cfg_wait(connection_descriptor * fd, node_list *nl,
5331                        uint32_t group_id, cargo_type ct, int force)
5332 {
5333 	app_data a;
5334 	int retval = 0;
5335 	DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(nl)););
5336 	retval = xcom_send_app_wait(fd, init_config_with_group(&a, nl, ct, group_id), force);
5337 	my_xdr_free((xdrproc_t) xdr_app_data, (char*)&a);
5338 	return retval;
5339 }
5340 
xcom_client_add_node(connection_descriptor * fd,node_list * nl,uint32_t group_id)5341 int	xcom_client_add_node(connection_descriptor *fd, node_list *nl,
5342                          uint32_t group_id)
5343 {
5344 	return xcom_send_cfg_wait(fd, nl, group_id, add_node_type, 0);
5345 }
5346 
xcom_client_remove_node(connection_descriptor * fd,node_list * nl,uint32_t group_id)5347 int	xcom_client_remove_node(connection_descriptor *fd, node_list *nl,
5348                             uint32_t group_id)
5349 {
5350 	return xcom_send_cfg_wait(fd, nl, group_id, remove_node_type, 0);
5351 }
5352 
5353 #ifdef NOTDEF
5354 /* Not completely implemented, need to be handled properly
5355    when received as a client message in dispatch_op.
5356    Should have separate opcode from normal add/remove,
5357    like force config_type */
xcom_client_force_add_node(connection_descriptor *,node_list * nl,uint32_t group_id)5358 int xcom_client_force_add_node(connection_descriptor *, node_list *nl,
5359                                uint32_t group_id)
5360 {
5361 	return xcom_send_cfg_wait(fd, nl, group_id, add_node_type, 1);
5362 }
5363 
xcom_client_force_remove_node(connection_descriptor *,node_list * nl,uint32_t group_id)5364 int xcom_client_force_remove_node(connection_descriptor *, node_list *nl,
5365                                   uint32_t group_id)
5366 {
5367 	return xcom_send_cfg_wait(fd, nl, group_id, remove_node_type, 1);
5368 }
5369 #endif
5370 
xcom_client_force_config(connection_descriptor * fd,node_list * nl,uint32_t group_id)5371 int xcom_client_force_config(connection_descriptor *fd, node_list *nl,
5372                              uint32_t group_id)
5373 {
5374 	return xcom_send_cfg_wait(fd, nl, group_id, force_config_type, 1);
5375 }
5376 
xcom_client_enable_arbitrator(connection_descriptor * fd)5377 int	xcom_client_enable_arbitrator(connection_descriptor *fd)
5378 {
5379 	app_data a;
5380 	int retval = 0;
5381 	init_app_data(&a);
5382 	a.body.c_t = enable_arbitrator;
5383 	retval = xcom_send_app_wait(fd, &a, 0);
5384 	my_xdr_free((xdrproc_t) xdr_app_data, (char*)&a);
5385 	return retval;
5386 }
5387 
5388 
xcom_client_disable_arbitrator(connection_descriptor * fd)5389 int	xcom_client_disable_arbitrator(connection_descriptor *fd)
5390 {
5391 	app_data a;
5392 	int retval = 0;
5393 	init_app_data(&a);
5394 	a.body.c_t = disable_arbitrator;
5395 	retval = xcom_send_app_wait(fd, &a, 0);
5396 	my_xdr_free((xdrproc_t) xdr_app_data, (char*)&a);
5397 	return retval;
5398 }
5399 
xcom_client_terminate_and_exit(connection_descriptor * fd)5400 int	xcom_client_terminate_and_exit(connection_descriptor *fd)
5401 {
5402 	app_data a;
5403 	int retval = 0;
5404 	init_app_data(&a);
5405 	a.body.c_t = x_terminate_and_exit;
5406 	retval = xcom_send_app_wait(fd, &a, 0);
5407 	my_xdr_free((xdrproc_t) xdr_app_data, (char*)&a);
5408 	return retval;
5409 }
5410 
xcom_client_set_cache_limit(connection_descriptor * fd,uint64_t cache_limit)5411 int	xcom_client_set_cache_limit(connection_descriptor *fd, uint64_t cache_limit)
5412 {
5413 	app_data a;
5414 	int retval = 0;
5415 	init_app_data(&a);
5416 	a.body.c_t = set_cache_limit;
5417 	a.body.app_u_u.cache_limit = cache_limit;
5418 	retval = xcom_send_app_wait(fd, &a, 0);
5419 	my_xdr_free((xdrproc_t) xdr_app_data, (char*)&a);
5420 	return retval;
5421 }
5422 
5423 
5424 
5425 
5426