1 /* Copyright (c) 2012, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA */
22
23 /** \file The new version of xcom is a major rewrite to allow
24 transmission of multiple messages from several sources
25 simultaneously without collision. The interface to xcom is largely
26 intact, one notable change is that xcom will consider the message
27 delivered as soon as it has got a majority. Consequently, the VP
28 set will not necessarily show all nodes which will actually
29 receive the message.
30
31 OHKFIX Add wait for complete last known node set to mimic the old
32 semantics.
33
34
35 IMPORTANT: What xcom does and what it does not do:
36
37 xcom messages are received in the same order on all nodes.
38
39 xcom guarantees that if a message is delivered to one node, it will
40 eventually be seen on all other nodes as well.
41
42 xcom messages are available to a crashed node when it comes up
43 again if at least one node which knows the value of the message
44 has not crashed. The size of the message cache is configurable.
45
46 OHKFIX Add logging to disk to make messages durable across system
47 crash and to increase the number of messages which may be cached.
48
49 There is no guarantee whatsoever about the order of messages from
50 different nodes, not even the order of multiple messages from the
51 same node. It is up to the client to impose such an order by
52 waiting on a message before it sends the next.
53
54 xcom can notify the client that a message has timed out, and in
55 that case will try to cancel the message, but it cannot guarantee
56 that a message which has timed out will not be delivered.
57
58 xcom attaches a node set to each message as it is delivered to the
59 client. This node set reflects the current node set that xcom
60 believes is active, it does not mean that the message has been
61 delivered yet to all nodes in the set. Neither does it mean that
62 the message has not been delivered to the nodes not in the set.
63
64 A cache of Paxos state machines is central to the new design. The
65 purpose of the cache is both to store a window of messages, and to
66 decouple the different parts of xcom, like message proposal,
67 message delivery and execution, and recovery. The old cache was
68 limited to caching messages, and a single state machine ran the
69 combined VP and Paxos algorithm. This constrained xcom to deliver
70 only a single message at a time.
71
72 Each instance of the Paxos state machine implements the basic
73 Paxos protocol. Unlike the cache in the old system, it is not
74 cleared when a site is deleted. This removes some problems
75 related to message delivery during site deletion. The cache is a
76 classic fixed size LRU with a hash index.
77
78 Some extensions to the basic Paxos algorithm has been implemented:
79
80 A node has ownership to all synodes with its own node number. Only
81 a node with node number N can propose a value for synode {X N},
82 where X is the sequence number, and N is the node number. Other
83 nodes can only propose the special value no_op for synode {X N}.
84 The reason for this is to retain the leaderless Paxos algorithm,
85 but to avoid collisions between nodes which are competing for the
86 same synode number. With this scheme, each node has its own unique
87 number series during normal operation. The scheme has the
88 following implications:
89
90 1. If a node N has not already proposed a value for the synode {X N},
91 it may at any time send a LEARN message to the other nodes with
92 the reserved value no_op, without going through phase 1 and 2 of
93 Paxos. This is because the other nodes are constrained to propose
94 no_op for this synode, so the final outcome will always be no_op.
95 To avoid unnecessary message transmission, a node will try to
96 broadcast the no_op LEARN messages by piggybacking the information
97 on the messages of the basic Paxos protocol.
98
99 2. Other nodes which want to find the value of synode {X N} may do
100 so by trying to get the value no_op accepted by following the
101 basic Paxos algorithm. The result will be the actual value
102 proposed by node N if it has done so, otherwise no_op. This will
103 typically only be necessary when a node is down, and the other
104 nodes need to find the values from the missing node in order to be
105 able to continue execution.
106
107 Messages are delivered in order to the client, and the order is
108 determined by the sequence number and the node number, with the
109 sequence number as the most significant part.
110
111 The xcom network interface has been redesigned and is now
112 implemented directly on top of TCP, and has so far been completely
113 trouble free. We use poll() or select() to implement non-blocking
114 send and receive, but libev could equally well have been used.
115
116 Multicast is implemented on top of unicast as before, but the
117 implementation is prepared to use real multicast with relatively
118 minor changes.
119
120 The roles of proposer, acceptor/learner, and executor are now
121 directly mapped to unique task types which interact with the Paxos
122 state machines, whereas the previous implementation folded all the
123 roles into a single event driven state machine.
124
125 The following terminology will be used:
126
127 A node is an instance of the xcom thread. There is only one instance
128 of the xcom thread in the agent.
129 A client is the application which is using xcom to send messages.
130 A thread is a real OS thread.
131 A task is a logical process. It is implemented by coroutines and
132 an explicit stack.
133
134 The implementation of tasks and non-blocking socket operations is
135 isolated in task.h and task.c.
136
137 A node will open a tcp connection to each of the other nodes. This
138 connection is used for all communication initiated by the node,
139 and replies to messages will arrive on the connection on which it
140 was sent.
141
142 static int tcp_server(task_arg);
143
144 The tcp_server listens on the xcom port and starts an
145 acceptor_learner_task whenever a new connection is detected.
146
147 static int tcp_reaper_task(task_arg);
148
149 Closes tcp connection which have been unused for too long.
150
151 static int sender_task(task_arg);
152
153 The sender_task waits for tcp messages on its input queue and
154 sends it on the tcp socket. If the socket is closed for any
155 reason, the sender_task will reconnect the socket. There is one
156 sender_task for each socket. The sender task exists mainly to
157 simplify the logic in the other tasks, but it could have been
158 replaced with a coroutine which handles the connection logic after
159 having reserved the socket for its client task.
160
161 static int generator_task(task_arg);
162
163 The generator_task reads messages from the client queue and moves
164 them into the input queue of the proposer_task.
165
166 OHKFIX Use a tcp socket instead of the client queue. We can then
167 remove the generator_task and let the acceptor_learner_task do the
168 dispatching.
169
170 static int proposer_task(task_arg);
171
172 Assign a message number to an outgoing message and try to get it
173 accepted. There may be several proposer tasks on each node
174 working in parallel. If there are multiple proposer tasks, xcom can
175 not guarantee that the messages will be sent in the same order as
176 received from the client.
177
178 static int acceptor_learner_task(task_arg);
179
180 This is the server part of the xcom thread. There is one
181 acceptor_learner_task for each node in the system. The acceptor
182 learner_task reads messages from the socket, finds the correct
183 Paxos state machine, and dispatches to the correct message handler
184 with the state machine and message as arguments.
185
186 static int reply_handler_task(task_arg);
187
188 The reply_handler_task does the same job as the
189 acceptor_learner_task, but listens on the socket which the node
190 uses to send messages, so it will handle only replies on that
191 socket.
192
193 static int executor_task(task_arg);
194
195 The ececutor_task waits for a Paxos message to be accpeted. When
196 the message is accepted, it is delivered to the client,
197 unless it is a no-op. In either case, the executor_task steps to
198 the next message and repeats the wait. If it times out waiting for
199 a message, it will try to get a no-op accepted.
200
201 static int alive_task(task_arg);
202
203 Sends i-am-alive to other nodes if there has been no normal traffic
204 for a while. It also pings nodes which seem to be inactive.
205
206 static int detector_task(task_arg);
207
208 The detector_task periodically scans the set of connections from
209 other nodes and sees if there has been any activity. If there has
210 been no activity for some time, it will assume that the node is
211 dead, and send a view message to the client.
212
213 static int boot_task(task_arg);
214
215 The boot task is started whenever xcom has no site definition. It
216 listens on the input queue until it detects either a boot or
217 recovery. In case of a boot, it will wait for a unified boot
218 message. In case of local recovery, it will wait until has seen
219 all recover messages. In both cases, the proposer task will try
220 to get those messages accepted/
221
222 static int boot_killer_task(task_arg);
223
224 Abort the boot process if there is no progress.
225
226 static int net_boot_task(task_arg);
227 static int net_recover_task(task_arg);
228
229 Reconfiguration:
230
231 The xcom reconfiguration process is essentially the one described in
232 "Reconfiguring a State Machine" by Lamport et al. as the R-alpha
233 algorithm.
234 We execute the reconfiguration command immediately, but the config is
235 only valid after a delay of alpha messages.
236 The parameter alpha is the same as
237 EVENT_HORIZON in this implementation. :/static.*too_far
238 All tcp messages from beyond the event horizon will be ignored.
239
240 */
241
242 #include "x_platform.h"
243
244 #include <stdio.h>
245 #include <string.h>
246 #include <errno.h>
247 #include <stdlib.h>
248 #include <sys/types.h>
249 #include <assert.h>
250 #include <signal.h>
251 #include <sys/time.h>
252 #include <limits.h>
253
254 #ifndef WIN
255 #include <sys/socket.h>
256 #include <netdb.h>
257 #include <sys/ioctl.h>
258 #include <net/if.h>
259 #ifndef __linux__
260 #include <sys/sockio.h>
261 #endif
262 #endif
263
264 #if defined(WIN32) || defined(WIN64)
265 #include <windows.h>
266 #endif
267
268 #ifndef _WIN32
269 #include <poll.h>
270 #endif
271
272 #include "xdr_utils.h"
273 #include "xcom_common.h"
274
275 #include "task_os.h"
276
277 #include "xcom_vp.h"
278
279 #include "simset.h"
280 #include "app_data.h"
281
282 #include "task.h"
283 #include "node_no.h"
284 #include "server_struct.h"
285 #include "xcom_detector.h"
286 #include "site_struct.h"
287 #include "xcom_transport.h"
288 #include "xcom_base.h"
289
290 #ifdef XCOM_HAVE_OPENSSL
291 #include "xcom_ssl_transport.h"
292 #endif
293
294 #include "task.h"
295 #include "task_net.h"
296 #include "task_debug.h"
297 #include "xcom_statistics.h"
298 #include "node_set.h"
299 #include "node_list.h"
300 #include "bitset.h"
301
302 #include "xcom_cache.h"
303
304 #include "xcom_vp_str.h"
305 #include "pax_msg.h"
306 #include "xcom_msg_queue.h"
307 #include "xcom_recover.h"
308 #include "synode_no.h"
309 #include "sock_probe.h"
310 #include "xcom_interface.h"
311 #include "xcom_memory.h"
312 #include "site_def.h"
313 #include "xcom_cfg.h"
314
315 #ifdef XCOM_HAVE_OPENSSL
316 #include "openssl/ssl.h"
317 #endif
318
319 /* {{{ Defines and constants */
320
321 #define SYS_STRERROR_SIZE 512
322 #define TERMINATE_DELAY 3.0
323 #define EVENT_HORIZON_MIN 10
324 unsigned int event_horizon = EVENT_HORIZON_MIN;
325
326 static void set_event_horizon(unsigned int eh) MY_ATTRIBUTE((unused));
327 /* purecov: begin deadcode */
set_event_horizon(unsigned int eh)328 static void set_event_horizon(unsigned int eh)
329 {
330 DBGOUT(FN; NDBG(eh,u));
331 event_horizon = eh;
332 }
333 /* purecov: end */
334
335 /* The number of proposers on one node */
336 #define PROPOSERS 10
337
338 /* Limit the number of acceptors */
339 /* #define MAXACCEPT 5 */
340
341 /* Skip prepare for first ballot */
342 int const threephase = 0;
343
344 /* Error injection for testing */
345 #define INJECT_ERROR 0
346
347 /* Crash a node early */
348 /* #define CRASH 1 */
349
350 /* }}} */
351
352
353 #include "retry.h"
354
355 /* #define USE_EXIT_TYPE */
356 /* #define NO_SWEEPER_TASK */
357
358 /* Limit batch size to sensible ? amount */
359 enum{
360 MAX_BATCH_SIZE = 0x3fffffff
361 };
362
363 int ARBITRATOR_HACK = 0;
364 static int AUTOBATCH = 1;
365 #define AGGRESSIVE_SWEEP
366
367 static int const no_duplicate_payload = 1;
368
369 /* Use buffered read when reading messages from the network */
370 static int use_buffered_read = 1;
371
372 /* Used to handle OOM errors */
373 static unsigned short oom_abort = 0;
374
375 /* {{{ Forward declarations */
376
377 long get_unique_long(void);
378 unsigned long msg_count(app_data_ptr a);
379 void get_host_name(char *a, char *name);
380
381 static double wakeup_delay(double old);
382
383 /* Task types */
384 static int proposer_task(task_arg arg);
385 static int executor_task(task_arg arg);
386 static int sweeper_task(task_arg arg);
387 extern int alive_task(task_arg arg);
388 static int generator_task(task_arg arg);
389 extern int detector_task(task_arg arg);
390
391 static int finished(pax_machine *p);
392 static int accepted(pax_machine *p);
393 static int started(pax_machine *p);
394 static synode_no first_free_synode(synode_no msgno);
395 static void free_forced_config_site_def();
396
397 extern void bit_set_or(bit_set *x, bit_set const *y);
398
399 /* }}} */
400
401 /* {{{ Global variables */
402
403 int xcom_shutdown = 0; /* Xcom_Shutdown flag */
404 synode_no executed_msg; /* The message we are waiting to execute */
405 synode_no max_synode; /* Max message number seen so far */
406 task_env *boot = NULL;
407 task_env *detector = NULL;
408 task_env *killer = NULL;
409 task_env *net_boot = NULL;
410 task_env *net_recover = NULL;
411 void *xcom_thread_input = 0;
412
413 static void init_proposers();
414
init_base_vars()415 void init_base_vars()
416 {
417 xcom_shutdown = 0; /* Xcom_Shutdown flag */
418 executed_msg = null_synode; /* The message we are waiting to execute */
419 max_synode = null_synode; /* Max message number seen so far */
420 boot = NULL;
421 detector = NULL;
422 killer = NULL;
423 net_boot = NULL;
424 net_recover = NULL;
425 xcom_thread_input = 0;
426 }
427
428 static task_env *executor = NULL;
429 static task_env *sweeper = NULL;
430 static task_env *retry = NULL;
431 static task_env *proposer[PROPOSERS];
432 static task_env *alive_t = NULL;
433
434 static uint32_t my_id = 0; /* Unique id of this instance */
435 static synode_no current_message; /* Current message number */
436 static synode_no last_config_modification_id; /*Last configuration change proposal*/
437
get_current_message()438 synode_no get_current_message()
439 {
440 return current_message;
441 }
442
443 static channel prop_input_queue; /* Proposer task input queue */
444
445 /* purecov: begin deadcode */
get_prop_input_queue()446 channel *get_prop_input_queue()
447 {
448 return & prop_input_queue;
449 }
450 /* purecov: end */
451
452 extern int client_boot_done;
453 extern int netboot_ok;
454 extern int booting;
455
456 extern start_t start_type;
457 static linkage exec_wait = {0,&exec_wait, &exec_wait}; /* Executor will wake up tasks sleeping here */
458
459 /*
460 #define IGNORE_LOSERS
461 */
462
463 #define BUILD_TIMEOUT 3.0
464
465 #define MAX_DEAD 10
466 static struct {
467 int n;
468 unsigned long id[MAX_DEAD];
469 } dead_sites;
470
get_max_synode()471 synode_no get_max_synode()
472 {
473 return max_synode;
474 }
475
476 static
synode_set_to_event_horizon(synode_no * s)477 void synode_set_to_event_horizon(synode_no *s)
478 {
479 s->msgno += event_horizon + 1;
480 s->node= 0;
481 }
482
483
484 /**
485 Set node group
486 */
set_group(uint32_t id)487 void set_group(uint32_t id)
488 {
489 MAY_DBG(FN; STRLIT("changing group id of global variables "); NDBG(id,lx););
490 /* set_group_id(id); */
491 current_message.group_id = id;
492 executed_msg.group_id = id;
493 max_synode.group_id = id;
494 set_log_group_id(id);
495 }
496
497
bury_site(uint32_t id)498 static void bury_site(uint32_t id)
499 {
500 if (id != 0) {
501 dead_sites.id[dead_sites.n % MAX_DEAD] = id;
502 dead_sites.n = (dead_sites.n + 1) % MAX_DEAD;
503 }
504 }
505
506
is_dead_site(uint32_t id)507 static bool_t is_dead_site(uint32_t id)
508 {
509 int i = 0;
510 for (i = 0; i < MAX_DEAD; i++) {
511 if (dead_sites.id[i] == id)
512 return TRUE;
513 else if (dead_sites.id[i] == 0)
514 return FALSE;
515 }
516 return FALSE;
517 }
518
519 extern node_set *init_node_set(node_set *set, u_int n);
520 extern node_set *alloc_node_set(node_set *set, u_int n);
521
522 #if 0
523 /* Find our previous message number. */
524 static synode_no decr_msgno(synode_no msgno)
525 {
526 synode_no ret = msgno;
527 ret.msgno--;
528 ret.node = get_nodeno(find_site_def(ret)); /* In case site and node number has changed */
529 return ret;
530 }
531 #endif
532
533 /* Find our next message number. */
incr_msgno(synode_no msgno)534 static synode_no incr_msgno(synode_no msgno)
535 {
536 synode_no ret = msgno;
537 ret.msgno++;
538 ret.node = get_nodeno(find_site_def(ret)); /* In case site and node number has changed */
539 return ret;
540 }
541
542 #if 0
543 /* Given message number, compute which node it belongs to */
544 static unsigned int msgno_to_node(synode_no msgno)
545 {
546 return msgno.node;
547 }
548 #endif
549
incr_synode(synode_no synode)550 synode_no incr_synode(synode_no synode)
551 {
552 synode_no ret = synode;
553 ret.node++;
554 if (ret.node >= get_maxnodes(find_site_def(synode))) {
555 ret.node = 0;
556 ret.msgno++;
557 }
558 /* DBGOUT(FN; SYCEXP(synode); SYCEXP(ret)); */
559 return ret; /* Change this if we change message number type */
560 }
561
562
decr_synode(synode_no synode)563 synode_no decr_synode(synode_no synode)
564 {
565 synode_no ret = synode;
566 if (ret.node == 0) {
567 ret.msgno--;
568 ret.node = get_maxnodes(find_site_def(ret));
569 }
570 ret.node--;
571 return ret; /* Change this if we change message number type */
572 }
573
574
skip_value(pax_msg * p)575 static void skip_value(pax_msg *p)
576 {
577 MAY_DBG(FN; SYCEXP(p->synode));
578 p->op = learn_op;
579 p->msg_type = no_op;
580 }
581
582
583 /* }}} */
584
585 /* {{{ Utilities and debug */
586
587 /* purecov: begin deadcode */
588 /* Print message and exit */
pexitall(int i)589 static void pexitall(int i)
590 {
591 int *r = (int*)calloc(1, sizeof(int));
592 *r = i;
593 DBGOUT(FN; NDBG(i, d); STRLIT("time "); NDBG(task_now(), f); );
594 XCOM_FSM(xa_terminate, int_arg(i)); /* Tell xcom to stop */
595 }
596 /* purecov: end */
597
598 #ifndef WIN
599 /* Ignore this signal */
ignoresig(int signum)600 static int ignoresig(int signum)
601 {
602 struct sigaction act;
603 struct sigaction oldact;
604
605 memset(&act, 0, sizeof(act));
606 act.sa_handler = SIG_IGN;
607 memset(&oldact, 0, sizeof(oldact));
608
609 return sigaction(signum,
610 &act,
611 &oldact);
612 }
613 #else
614 #define SIGPIPE 0
ignoresig(int signum)615 static int ignoresig(int signum)
616 {
617 return 0;
618 }
619 #endif
620
621 /* }}} */
622
623 #if 0
624 static void dbg_machine_and_msg(pax_machine *p, pax_msg *pm)
625 {
626 GET_GOUT;
627 STRLIT("machine ");
628 ADD_GOUT(dbg_pax_machine(p));
629 STRLIT(" ");
630 STRLIT("msg ");
631 COPY_AND_FREE_GOUT(dbg_pax_msg(pm));
632 PRINT_GOUT;
633 FREE_GOUT;
634 }
635
636
637 #endif
638
recently_active(pax_machine * p)639 static int recently_active(pax_machine *p)
640 {
641 MAY_DBG(FN;
642 SYCEXP(p->synode); STRLIT(" op ");
643 PTREXP(p); STRLIT(p->learner.msg ? pax_op_to_str(p->learner.msg->op) : "NULL");
644 NDBG(p->last_modified, f); NDBG(task_now(), f));
645 return p->last_modified != 0.0 && (p->last_modified + 0.5 + median_time()) > task_now();
646 }
647
648
finished(pax_machine * p)649 static inline int finished(pax_machine *p)
650 {
651 MAY_DBG(FN;
652 SYCEXP(p->synode); STRLIT(" op ");
653 PTREXP(p); STRLIT(p->learner.msg ? pax_op_to_str(p->learner.msg->op) : "NULL");
654 );
655 return p->learner.msg && (p->learner.msg->op == learn_op || p->learner.msg->op == tiny_learn_op);
656 }
657
658
pm_finished(pax_machine * p)659 int pm_finished(pax_machine *p)
660 {
661 return finished(p);
662 }
663
664
accepted(pax_machine * p)665 static inline int accepted(pax_machine *p)
666 {
667 MAY_DBG(FN;
668 SYCEXP(p->synode); STRLIT(" op ");
669 PTREXP(p); STRLIT(p->acceptor.msg ? pax_op_to_str(p->acceptor.msg->op) : "NULL");
670 );
671 return p->acceptor.msg && p->acceptor.msg->op != initial_op;
672 }
673
674
accepted_noop(pax_machine * p)675 static inline int accepted_noop(pax_machine *p)
676 {
677 MAY_DBG(FN;
678 SYCEXP(p->synode); STRLIT(" op ");
679 PTREXP(p); STRLIT(p->acceptor.msg ? pax_op_to_str(p->acceptor.msg->op) : "NULL");
680 );
681 return accepted(p) && p->acceptor.msg->msg_type == no_op;
682 }
683
684
noop_match(pax_machine * p,pax_msg * pm)685 static inline int noop_match(pax_machine *p, pax_msg *pm)
686 {
687 return pm->msg_type == no_op && accepted_noop(p);
688 }
689
690
started(pax_machine * p)691 static inline int started(pax_machine *p)
692 {
693 return
694 p->op != initial_op ||
695 (p->acceptor.promise.cnt > 0) ||
696 (p->proposer.msg && (p->proposer.msg->op != initial_op)) ||
697 accepted(p) ||
698 finished(p);
699 }
700
701
702 /* }}} */
703
set_last_received_config(synode_no received_config_change)704 void set_last_received_config(synode_no received_config_change)
705 {
706 last_config_modification_id= received_config_change;
707 }
708
709 /* {{{ Definition of majority */
max_check(site_def const * site)710 static inline node_no max_check(site_def const *site)
711 {
712 #ifdef MAXACCEPT
713 return MIN(get_maxnodes(site), MAXACCEPT);
714 #else
715 return get_maxnodes(site);
716 #endif
717 }
718
719 static site_def * forced_config = 0;
720
721 /* Definition of majority */
majority(bit_set const * nodeset,site_def const * s,int all,int delay MY_ATTRIBUTE ((unused)),int force)722 static inline int majority(bit_set const *nodeset, site_def const *s, int all, int delay MY_ATTRIBUTE((unused)), int force)
723 {
724 node_no ok = 0;
725 node_no i = 0;
726 int retval = 0;
727 node_no max = max_check(s);
728
729 /* DBGOUT(FN; NDBG(max,lu); NDBG(all,d); NDBG(delay,d); NDBG(force,d)); */
730
731 /* Count nodes that has answered */
732 for (i = 0; i < max; i++) {
733 if (BIT_ISSET(i, nodeset)) {
734 ok++;
735 }
736 }
737
738 /* If we are forcing messages, attempt to ensure consistency by
739 requiring all remaining nodes to agree. Forced_config points to
740 the config that should be used as acceptors in this
741 case. Another possibility is to use the original config and
742 count the number of live nodes, but since the force flag is
743 being used only to force a new config, it seems safer to use
744 the new config and no time-dependent info. Note that we are
745 counting the answers based on the normal config, but use the
746 number of nodes from forced_config. This is safe, since we can
747 assume that the nodes that are not in forced_config will never
748 answer. */
749
750 if(force){
751 DBGOUT(FN; STRLIT("force majority"); NDBG(ok ,u); NDBG(max ,u); NDBG(get_maxnodes(forced_config),u));
752 return ok == get_maxnodes(forced_config);
753 }else{
754 /* Have now seen answer from all live nodes */
755 retval = all ? ok == max : ok > max / 2
756 || (ARBITRATOR_HACK && (2 == max));
757 /* DBGOUT(FN; NDBG(max,lu); NDBG(all,d); NDBG(delay,d); NDBG(retval,d)); */
758 return retval;
759 }
760 }
761
762
763 #define IS_CONS_ALL(p) ((p)->proposer.msg->a ? (p)->proposer.msg->a->consensus == cons_all : 0)
764
765 /* See if a majority of acceptors have answered our prepare */
prep_majority(site_def const * site,pax_machine * p)766 static int prep_majority(site_def const * site, pax_machine *p)
767 {
768
769 int ok = 0;
770
771 assert(p);
772 assert(p->proposer.prep_nodeset);
773 assert(p->proposer.msg);
774 /* DBGOUT(FN; BALCEXP(p->proposer.bal)); */
775 ok = majority(p->proposer.prep_nodeset, site, IS_CONS_ALL(p), p->proposer.bal.cnt == 1, p->proposer.msg->force_delivery || p->force_delivery);
776 return ok;
777 }
778
779 /* See if a majority of acceptors have answered our propose */
prop_majority(site_def const * site,pax_machine * p)780 static int prop_majority(site_def const * site, pax_machine *p)
781 {
782 int ok = 0;
783
784 assert(p);
785 assert(p->proposer.prop_nodeset);
786 assert(p->proposer.msg);
787 /* DBGOUT(FN; BALCEXP(p->proposer.bal)); */
788 ok = majority(p->proposer.prop_nodeset, site, IS_CONS_ALL(p), p->proposer.bal.cnt == 1, p->proposer.msg->force_delivery || p->force_delivery);
789 return ok;
790 }
791
792 /* }}} */
793
794 /* {{{ Xcom thread */
795
796 /* purecov: begin deadcode */
797 /* Xcom thread start function */
xcom_thread_main(gpointer cp)798 gpointer xcom_thread_main(gpointer cp)
799 {
800 G_MESSAGE("Starting xcom on port %d", atoi((char *)cp));
801 xcom_thread_init();
802 /* Initialize task system and enter main loop */
803 taskmain((xcom_port)atoi((char *)cp));
804 /* Xcom is finished when we get here */
805 DBGOUT(FN; STRLIT("Deconstructing xcom thread"));
806 xcom_thread_deinit();
807 G_MESSAGE("Exiting xcom thread");
808 return NULL;
809 }
810 /* purecov: end */
811 static site_def const * executor_site = 0;
812
get_executor_site()813 site_def const * get_executor_site()
814 {
815 return executor_site;
816 }
817
818 static site_def *proposer_site = 0;
819
get_proposer_site()820 site_def const *get_proposer_site()
821 {
822 return proposer_site;
823 }
824
825
init_xcom_base()826 void init_xcom_base()
827 {
828 xcom_shutdown = 0;
829 current_message = null_synode;
830 executed_msg = null_synode;
831 max_synode = null_synode;
832 client_boot_done = 0;
833 netboot_ok = 0;
834 booting = 0;
835 start_type = IDLE;
836
837 xcom_recover_init();
838 my_id = new_id();
839 push_site_def(NULL);
840 /* update_servers(NULL); */
841 xcom_cache_var_init();
842 median_filter_init();
843 link_init(&exec_wait, type_hash("task_env"));
844 executor_site = 0;
845 proposer_site = 0;
846 }
847
init_tasks()848 static void init_tasks()
849 {
850 set_task(&boot, NULL);
851 set_task(&net_boot, NULL);
852 set_task(&net_recover, NULL);
853 set_task(&killer, NULL);
854 set_task(&executor, NULL);
855 set_task(&retry, NULL);
856 set_task(&detector, NULL);
857 init_proposers();
858 set_task(&alive_t, NULL);
859 set_task(&sweeper, NULL);
860 }
861
862
863 /* Initialize the xcom thread */
xcom_thread_init()864 void xcom_thread_init()
865 {
866 #ifndef NO_SIGPIPE
867 signal(SIGPIPE, SIG_IGN);
868 #endif
869 init_base_vars();
870 init_site_vars();
871 init_crc32c();
872 my_srand48((long int)task_now());
873
874 init_xcom_base();
875 init_tasks();
876 init_cache();
877
878 /* Initialize input queue */
879 channel_init(&prop_input_queue, type_hash("msg_link"));
880 init_link_list();
881 task_sys_init();
882 }
883
884
885 /* Empty the proposer input queue */
empty_prop_input_queue()886 static void empty_prop_input_queue()
887 {
888 empty_msg_channel(&prop_input_queue);
889 MAY_DBG(FN; STRLIT("prop_input_queue empty"));
890 }
891
892
893 /* De-initialize the xcom thread */
xcom_thread_deinit()894 void xcom_thread_deinit()
895 {
896 DBGOUT(FN; STRLIT("Empty proposer input queue"));
897 empty_prop_input_queue();
898 DBGOUT(FN; STRLIT("Empty link free list"));
899 empty_link_free_list();
900 DBGOUT(FN; STRLIT("De-initialize cache"));
901 deinit_cache();
902 garbage_collect_servers();
903 }
904
905 #define PROP_ITER int i; for(i = 0; i < PROPOSERS; i++)
906
907 static bool_t force_recover = FALSE;
908 /* purecov: begin deadcode */
must_force_recover()909 bool_t must_force_recover()
910 {
911 return force_recover;
912 }
913
914
set_force_recover(bool_t const x)915 void set_force_recover(bool_t const x)
916 {
917 force_recover = x;
918 }
919 /* purecov: end */
920
init_proposers()921 static void init_proposers()
922 {
923 PROP_ITER {
924 set_task(&proposer[i], NULL);
925 }
926 }
927
928
create_proposers()929 static void create_proposers()
930 {
931 PROP_ITER {
932 set_task(&proposer[i], task_new(proposer_task, int_arg(i), "proposer_task", XCOM_THREAD_DEBUG));
933 }
934 }
935
936
terminate_proposers()937 static void terminate_proposers()
938 {
939 PROP_ITER {
940 task_terminate(proposer[i]);
941 }
942 }
943
free_forced_config_site_def()944 static void free_forced_config_site_def()
945 {
946 free_site_def(forced_config);
947 forced_config= NULL;
948 }
949
950 #if TASK_DBUG_ON
951 static void dbg_proposers() MY_ATTRIBUTE((unused));
dbg_proposers()952 static void dbg_proposers()
953 {
954 GET_GOUT;
955 NDBG(PROPOSERS, d);
956 {
957 PROP_ITER {
958 PPUT(proposer[i]);
959 }
960 }
961 PRINT_GOUT;
962 FREE_GOUT;
963 }
964 #endif
965
set_proposer_startpoint()966 static void set_proposer_startpoint()
967 {
968 DBGOHK(FN; STRLIT("changing current message"));
969 if (max_synode.msgno <= 1)
970 set_current_message(first_free_synode(max_synode));
971 else
972 set_current_message(incr_msgno(first_free_synode(max_synode)));
973 }
974
975
976
check_tasks()977 void check_tasks()
978 {
979 }
980
981
982 /* }}} */
983
984 /* {{{ Task functions */
985 /* purecov: begin deadcode */
986 /* Match any port */
yes(xcom_port port MY_ATTRIBUTE ((unused)))987 static int yes(xcom_port port MY_ATTRIBUTE((unused)))
988 {
989 return 1;
990 }
991
992 /* Create tasks and enter the task main loop */
taskmain(xcom_port listen_port)993 int taskmain(xcom_port listen_port)
994 {
995 init_xcom_transport(listen_port);
996 set_port_matcher(yes); /* For clients that use only addr, not addr:port */
997
998 MAY_DBG(FN; STRLIT("enter taskmain"));
999 ignoresig(SIGPIPE);
1000
1001 {
1002 result fd = {0,0};
1003
1004 if ((fd = announce_tcp(listen_port)).val < 0) {
1005 MAY_DBG(FN; STRLIT("cannot annonunce tcp "); NDBG(listen_port, d));
1006 task_dump_err(fd.funerr);
1007 g_critical("Unable to announce tcp port %d. Port already in use?", listen_port);
1008 }
1009
1010 MAY_DBG(FN; STRLIT("Creating tasks"));
1011 task_new(generator_task, null_arg, "generator_task", XCOM_THREAD_DEBUG);
1012 task_new(tcp_server, int_arg(fd.val), "tcp_server", XCOM_THREAD_DEBUG);
1013 /* task_new(tcp_reaper_task, null_arg, "tcp_reaper_task", XCOM_THREAD_DEBUG); */
1014 /* task_new(xcom_statistics, null_arg, "xcom_statistics", XCOM_THREAD_DEBUG); */
1015 /* task_new(detector_task, null_arg, "detector_task", XCOM_THREAD_DEBUG); */
1016 MAY_DBG(FN; STRLIT("XCOM is listening on "); NPUT(listen_port, d));
1017 }
1018
1019 task_loop();
1020
1021 MAY_DBG(FN; STRLIT(" exit"));
1022 return 1;
1023 }
1024
1025
start_run_tasks()1026 void start_run_tasks()
1027 {
1028 force_recover = 0;
1029 client_boot_done = 1;
1030 netboot_ok = 1;
1031 booting = 0;
1032 set_proposer_startpoint();
1033 create_proposers();
1034 set_task(&executor, task_new(executor_task, null_arg, "executor_task", XCOM_THREAD_DEBUG));
1035 set_task(&sweeper, task_new(sweeper_task, null_arg, "sweeper_task", XCOM_THREAD_DEBUG));
1036 set_task(&detector, task_new(detector_task, null_arg, "detector_task", XCOM_THREAD_DEBUG));
1037 set_task(&alive_t, task_new(alive_task, null_arg, "alive_task", XCOM_THREAD_DEBUG));
1038 }
1039
1040 /* Create tasks and enter the task main loop */
xcom_taskmain(xcom_port listen_port)1041 int xcom_taskmain(xcom_port listen_port)
1042 {
1043 init_xcom_transport(listen_port);
1044
1045 MAY_DBG(FN; STRLIT("enter taskmain"));
1046 ignoresig(SIGPIPE);
1047
1048 {
1049 result fd = {0,0};
1050 if ((fd = announce_tcp(listen_port)).val < 0) {
1051 MAY_DBG(FN; STRLIT("cannot annonunce tcp "); NDBG(listen_port, d));
1052 task_dump_err(fd.funerr);
1053 g_critical("Unable to announce tcp port %d. Port already in use?", listen_port);
1054 pexitall(1);
1055 }
1056
1057 MAY_DBG(FN; STRLIT("Creating tasks"));
1058 /* task_new(generator_task, null_arg, "generator_task", XCOM_THREAD_DEBUG); */
1059 task_new(tcp_server, int_arg(fd.val), "tcp_server", XCOM_THREAD_DEBUG);
1060 task_new(tcp_reaper_task, null_arg, "tcp_reaper_task", XCOM_THREAD_DEBUG);
1061 /* task_new(xcom_statistics, null_arg, "xcom_statistics", XCOM_THREAD_DEBUG); */
1062 /* task_new(detector_task, null_arg, "detector_task", XCOM_THREAD_DEBUG); */
1063 MAY_DBG(FN; STRLIT("XCOM is listening on "); NPUT(listen_port, d));
1064 }
1065
1066 start_run_tasks();
1067 task_loop();
1068
1069 MAY_DBG(FN; STRLIT(" exit"));
1070 return 1;
1071 }
1072 /* purecov: end */
1073 static xcom_state_change_cb xcom_run_cb = 0;
1074 static xcom_state_change_cb xcom_terminate_cb = 0;
1075 static xcom_state_change_cb xcom_comms_cb = 0;
1076 static xcom_state_change_cb xcom_exit_cb = 0;
1077 static xcom_state_change_cb xcom_expel_cb = 0;
1078
set_xcom_run_cb(xcom_state_change_cb x)1079 void set_xcom_run_cb(xcom_state_change_cb x)
1080 {
1081 xcom_run_cb = x;
1082 }
1083
set_xcom_comms_cb(xcom_state_change_cb x)1084 void set_xcom_comms_cb(xcom_state_change_cb x)
1085 {
1086 xcom_comms_cb = x;
1087 }
1088 /* purecov: begin deadcode */
set_xcom_terminate_cb(xcom_state_change_cb x)1089 void set_xcom_terminate_cb(xcom_state_change_cb x)
1090 {
1091 xcom_terminate_cb = x;
1092 }
1093 /* purecov: end */
set_xcom_exit_cb(xcom_state_change_cb x)1094 void set_xcom_exit_cb(xcom_state_change_cb x)
1095 {
1096 xcom_exit_cb = x;
1097 }
1098
set_xcom_expel_cb(xcom_state_change_cb x)1099 void set_xcom_expel_cb(xcom_state_change_cb x)
1100 {
1101 xcom_expel_cb = x;
1102 }
1103
xcom_taskmain2(xcom_port listen_port)1104 int xcom_taskmain2(xcom_port listen_port)
1105 {
1106 init_xcom_transport(listen_port);
1107
1108 MAY_DBG(FN; STRLIT("enter taskmain"));
1109 ignoresig(SIGPIPE);
1110
1111 {
1112 result fd = {0,0};
1113 if ((fd = announce_tcp(listen_port)).val < 0) {
1114 MAY_DBG(FN; STRLIT("cannot annonunce tcp "); NDBG(listen_port, d));
1115 task_dump_err(fd.funerr);
1116 g_critical("Unable to announce tcp port %d. Port already in use?", listen_port);
1117 if(xcom_comms_cb){
1118 xcom_comms_cb(XCOM_COMMS_ERROR);
1119 }
1120 if(xcom_terminate_cb){
1121 xcom_terminate_cb(0);
1122 }
1123 return 1;
1124 }
1125
1126 if(xcom_comms_cb){
1127 xcom_comms_cb(XCOM_COMMS_OK);
1128 }
1129
1130 MAY_DBG(FN; STRLIT("Creating tasks"));
1131 /* task_new(generator_task, null_arg, "generator_task", XCOM_THREAD_DEBUG); */
1132 task_new(tcp_server, int_arg(fd.val), "tcp_server", XCOM_THREAD_DEBUG);
1133 task_new(tcp_reaper_task, null_arg, "tcp_reaper_task", XCOM_THREAD_DEBUG);
1134 /* task_new(xcom_statistics, null_arg, "xcom_statistics", XCOM_THREAD_DEBUG); */
1135 /* task_new(detector_task, null_arg, "detector_task", XCOM_THREAD_DEBUG); */
1136 MAY_DBG(FN; STRLIT("XCOM is listening on "); NPUT(listen_port, d));
1137 }
1138
1139 task_loop();
1140
1141 #if defined(XCOM_HAVE_OPENSSL)
1142 xcom_cleanup_ssl();
1143 #endif
1144
1145 MAY_DBG(FN; STRLIT(" exit"));
1146 xcom_thread_deinit();
1147 return 1;
1148 }
1149
1150
1151 /* {{{ Paxos message construction and sending */
1152
1153 /* Initialize a message for sending */
prepare(pax_msg * p,pax_op op)1154 static void prepare(pax_msg *p, pax_op op)
1155 {
1156 p->op = op;
1157 p->reply_to = p->proposal;
1158 }
1159
1160
1161 /* Initialize a prepare_msg */
prepare_msg(pax_msg * p)1162 static int prepare_msg(pax_msg *p)
1163 {
1164 prepare(p, prepare_op);
1165 /* p->msg_type = normal; */
1166 return send_to_acceptors(p, "prepare_msg");
1167 }
1168
1169
1170 /* Initialize a noop_msg */
create_noop(pax_msg * p)1171 static pax_msg *create_noop(pax_msg *p)
1172 {
1173 prepare(p, prepare_op);
1174 p->msg_type = no_op;
1175 return p;
1176 }
1177
1178
1179 /* Initialize a read_msg */
create_read(site_def const * site,pax_msg * p)1180 static pax_msg *create_read(site_def const * site, pax_msg *p)
1181 {
1182 p->msg_type = normal;
1183 p->proposal.node = get_nodeno(site);
1184 prepare(p, read_op);
1185 return p;
1186 }
1187
1188
skip_msg(pax_msg * p)1189 static int skip_msg(pax_msg *p)
1190 {
1191 prepare(p, skip_op);
1192 MAY_DBG(FN; STRLIT("skipping message "); SYCEXP(p->synode));
1193 p->msg_type = no_op;
1194 return send_to_all(p, "skip_msg");
1195 }
1196
1197
brand_app_data(pax_msg * p)1198 static void brand_app_data(pax_msg *p)
1199 {
1200 if (p->a) {
1201 p->a->app_key.msgno = p->synode.msgno;
1202 p->a->app_key.node = p->synode.node;
1203 p->a->app_key.group_id = p->a->group_id = p->synode.group_id;
1204 }
1205 }
1206
1207
my_unique_id(synode_no synode)1208 static synode_no my_unique_id(synode_no synode)
1209 {
1210 assert(my_id != 0);
1211 /* Random number derived from node number and timestamp which uniquely defines this instance */
1212 synode.group_id = my_id;
1213 return synode;
1214 }
1215
1216
set_unique_id(pax_msg * msg,synode_no synode)1217 static void set_unique_id(pax_msg *msg, synode_no synode )
1218 {
1219 app_data_ptr a = msg->a;
1220 while (a) {
1221 a->unique_id = synode;
1222 a = a->next;
1223 }
1224 }
1225
1226
propose_msg(pax_msg * p)1227 static int propose_msg(pax_msg *p)
1228 {
1229 p->op = accept_op;
1230 p->reply_to = p->proposal;
1231 brand_app_data(p);
1232 /* set_unique_id(p, my_unique_id(synode)); */
1233 return send_to_acceptors(p, "propose_msg");
1234 }
1235
1236
set_learn_type(pax_msg * p)1237 static void set_learn_type(pax_msg *p)
1238 {
1239 p->op = learn_op;
1240 p->msg_type = p->a ? normal : no_op;
1241 }
1242
1243 /* purecov: begin deadcode */
learn_msg(site_def const * site,pax_msg * p)1244 static int learn_msg(site_def const * site, pax_msg *p)
1245 {
1246 set_learn_type(p);
1247 p->reply_to = p->proposal;
1248 brand_app_data(p);
1249 MAY_DBG(FN;
1250 dbg_bitset(p->receivers, get_maxnodes(site));
1251 );
1252 return send_to_all_site(site, p, "learn_msg");
1253 }
1254 /* purecov: end */
tiny_learn_msg(site_def const * site,pax_msg * p)1255 static int tiny_learn_msg(site_def const *site, pax_msg *p)
1256 {
1257 int retval;
1258 pax_msg * tmp = clone_pax_msg_no_app(p);
1259 pax_machine * pm = get_cache(p->synode);
1260
1261 ref_msg(tmp);
1262 tmp->msg_type = p->a ? normal : no_op;
1263 tmp->op = tiny_learn_op;
1264 tmp->reply_to = pm->proposer.bal;
1265 brand_app_data(tmp);
1266 MAY_DBG(FN;
1267 dbg_bitset(tmp->receivers, get_maxnodes(site));
1268 );
1269 retval = send_to_all_site(site, tmp, "tiny_learn_msg");
1270 unref_msg(&tmp);
1271 return retval;
1272 }
1273
1274
1275 /* }}} */
1276
1277 /* {{{ Proposer task */
1278
prepare_push_3p(site_def const * site,pax_machine * p,pax_msg * msg,synode_no msgno)1279 static void prepare_push_3p(site_def const * site, pax_machine *p, pax_msg *msg, synode_no msgno)
1280 {
1281 MAY_DBG(FN;
1282 SYCEXP(msgno);
1283 NDBG(p->proposer.bal.cnt, d); NDBG(p->acceptor.promise.cnt, d));
1284 p->proposer.bal.node = get_nodeno(site);
1285 {
1286 int maxcnt = MAX(p->proposer.bal.cnt, p->acceptor.promise.cnt);
1287 p->proposer.bal.cnt = ++maxcnt;
1288 }
1289 msg->synode = msgno;
1290 msg->proposal = p->proposer.bal;
1291 }
1292
1293
push_msg_2p(site_def const * site,pax_machine * p)1294 static void push_msg_2p(site_def const * site, pax_machine *p)
1295 {
1296 assert(p->proposer.msg);
1297
1298 BIT_ZERO(p->proposer.prop_nodeset);
1299 MAY_DBG(FN; SYCEXP(p->synode));
1300 p->proposer.bal.cnt = 0;
1301 p->proposer.bal.node = get_nodeno(site);
1302 p->proposer.msg->proposal = p->proposer.bal;
1303 p->proposer.msg->synode = p->synode;
1304 p->proposer.msg->force_delivery = p->force_delivery;
1305 propose_msg(p->proposer.msg);
1306 }
1307
1308
push_msg_3p(site_def const * site,pax_machine * p,pax_msg * msg,synode_no msgno,pax_msg_type msg_type)1309 static void push_msg_3p(site_def const * site, pax_machine *p, pax_msg *msg, synode_no msgno, pax_msg_type msg_type)
1310 {
1311 assert(msgno.msgno != 0);
1312 prepare_push_3p(site, p, msg, msgno);
1313 msg->msg_type = msg_type;
1314 BIT_ZERO(p->proposer.prep_nodeset);
1315 assert(p->proposer.msg);
1316 msg->force_delivery = p->force_delivery;
1317 prepare_msg(msg);
1318 MAY_DBG(FN; BALCEXP(msg->proposal);
1319 SYCEXP(msgno); STRLIT(" op "); STRLIT(pax_op_to_str(msg->op)));
1320 }
1321
1322
1323 /* Brand client message with unique ID */
brand_client_msg(pax_msg * msg,synode_no msgno)1324 static void brand_client_msg(pax_msg *msg, synode_no msgno)
1325 {
1326 assert(!synode_eq(msgno, null_synode));
1327 set_unique_id(msg, my_unique_id(msgno));
1328 }
1329
1330 /* purecov: begin deadcode */
reject_send(site_def const * site,recover_action r)1331 static int reject_send(site_def const * site, recover_action r)
1332 {
1333 return r != rec_send && xcom_booted() && (!site || !enough_live_nodes(site));
1334 }
1335 /* purecov: end */
1336
xcom_send(app_data_ptr a,pax_msg * msg)1337 void xcom_send(app_data_ptr a, pax_msg *msg)
1338 {
1339 MAY_DBG(FN; PTREXP(a); SYCEXP(a->app_key); SYCEXP(msg->synode));
1340 msg->a = a;
1341 msg->op = client_msg;
1342 {
1343 msg_link * link = msg_link_new(msg, VOID_NODE_NO);
1344 MAY_DBG(FN; COPY_AND_FREE_GOUT(dbg_pax_msg(msg)));
1345 channel_put(&prop_input_queue, &link->l);
1346 }
1347 }
1348
1349 /* purecov: begin deadcode */
generator_task(task_arg arg MY_ATTRIBUTE ((unused)))1350 static int generator_task(task_arg arg MY_ATTRIBUTE((unused)))
1351 {
1352 DECL_ENV
1353 int dummy;
1354 END_ENV;
1355
1356 TASK_BEGIN
1357 MAY_DBG(FN; );
1358 check_tasks(); /* Start tasks which should be running */
1359 for(;;) {
1360 app_data_ptr a = 0;
1361 while (a) {
1362 assert(!(a->chosen && synode_eq(a->app_key, null_synode)));
1363 MAY_DBG(FN; PTREXP(a); SYCEXP(a->app_key));
1364 MAY_DBG(FN;
1365 COPY_AND_FREE_GOUT(dbg_app_data(a));
1366 );
1367 if (a->body.c_t == exit_type) {
1368 bury_site(get_group_id(get_site_def()));
1369 copy_app_data(&a, NULL);
1370 task_terminate_all(); /* Kill, kill, kill, kill, kill, kill. This is the end. */
1371
1372 init_xcom_base(); /* Reset shared variables */
1373 init_tasks(); /* Reset task variables */
1374 free_site_defs();
1375 free_forced_config_site_def();
1376 garbage_collect_servers();
1377 DBGOUT(FN; STRLIT("shutting down"));
1378 xcom_shutdown = 1;
1379 TERMINATE;
1380 } else if (a->body.c_t == reset_type || a->body.c_t == remove_reset_type) {
1381 if(a->body.c_t == reset_type) /* Not for remove node */
1382 bury_site(get_group_id(get_site_def()));
1383 copy_app_data(&a, NULL);
1384 init_xcom_base(); /* Reset shared variables */
1385 check_tasks(); /* Stop tasks which should not be running */
1386 free_site_defs();
1387 free_forced_config_site_def();
1388 garbage_collect_servers();
1389 } else {
1390 if (reject_send(get_site_def(), a->recover)) {
1391 copy_app_data(&a, NULL);
1392 } else {
1393 pax_msg * msg = pax_msg_new(null_synode, get_site_def());
1394 if (is_real_recover(a)) {
1395 msg->start_type = RECOVER;
1396 if (force_recover) {
1397 /* We are desperate to recover,
1398 fake an accepted message with null key */
1399 DBGOUT(FN; STRLIT("forcing recovery "));
1400 a->chosen = TRUE;
1401 }
1402 }
1403 xcom_send(a, msg);
1404 }
1405 }
1406 }
1407
1408 TASK_DELAY(0.1);
1409 }
1410 FINALLY
1411 TASK_END;
1412 }
1413 /* purecov: end */
1414
1415 #define FNVSTART 0x811c9dc5
1416
1417 /* Fowler-Noll-Vo type multiplicative hash */
fnv_hash(unsigned char * buf,size_t length,uint32_t sum)1418 static uint32_t fnv_hash(unsigned char *buf, size_t length, uint32_t sum)
1419 {
1420 size_t i = 0;
1421 for (i = 0; i < length; i++) {
1422 sum = sum * (uint32_t)0x01000193 ^ (uint32_t)buf[i];
1423 }
1424 return sum;
1425 }
1426
1427 /**
1428 Create a new (hopefully unique) ID. The basic idea is to create a hash from
1429 the host ID and a timestamp.
1430 */
new_id()1431 uint32_t new_id()
1432 {
1433 long id = get_unique_long();
1434 double timestamp = task_now();
1435 uint32_t retval = 0;
1436 while (retval == 0 ||
1437 is_dead_site(retval)) { /* Avoid returning 0 or already used site id */
1438 retval = fnv_hash((unsigned char *) & id, sizeof(id), 0);
1439 retval = fnv_hash((unsigned char *) & timestamp, sizeof(timestamp), retval);
1440 }
1441 return retval;
1442 }
1443
getstart(app_data_ptr a)1444 static synode_no getstart(app_data_ptr a)
1445 {
1446 synode_no retval = null_synode;
1447 G_DEBUG("getstart group_id %x", a->group_id);
1448 if (!a || a->group_id == null_id) {
1449 retval.group_id = new_id();
1450 } else {
1451 a->app_key.group_id = a->group_id;
1452 retval = a->app_key;
1453 if (get_site_def() && retval.msgno != 1) {
1454 /* Not valid until after event horizon has been passed */
1455 synode_set_to_event_horizon(&retval);
1456 }
1457 }
1458 return retval;
1459 }
1460
site_install_action(site_def * site,cargo_type operation)1461 void site_install_action(site_def *site, cargo_type operation)
1462 {
1463 DBGOUT(FN; NDBG(get_nodeno(get_site_def()), u));
1464 if (synode_gt(site->start, max_synode))
1465 set_max_synode(site->start);
1466 site->nodeno = xcom_find_node_index(&site->nodes);
1467 push_site_def(site);
1468 DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_site_def(site)));
1469 set_group(get_group_id(site));
1470 if(get_maxnodes(get_site_def())){
1471 update_servers(site, operation);
1472 }
1473 site->install_time = task_now();
1474 DBGOUT(FN; SYCEXP(site->start); SYCEXP(site->boot_key));
1475 DBGOUT(FN; NDBG(get_nodeno(site), u));
1476 DBGOUT(SYCEXP(site->start); SYCEXP(site->boot_key); NDBG(site->install_time,f));
1477 DBGOUT(NDBG(get_nodeno(site), u));
1478 }
1479
create_site_def_with_start(app_data_ptr a,synode_no start)1480 static site_def *create_site_def_with_start(app_data_ptr a, synode_no start)
1481 {
1482 site_def * site = new_site_def();
1483 MAY_DBG(FN; COPY_AND_FREE_GOUT(dbg_list(&a->body.app_u_u.nodes)); );
1484 init_site_def(a->body.app_u_u.nodes.node_list_len,
1485 a->body.app_u_u.nodes.node_list_val, site);
1486 site->start = start;
1487 site->boot_key = a->app_key;
1488 return site;
1489 }
1490
1491
install_ng_with_start(app_data_ptr a,synode_no start)1492 static site_def * install_ng_with_start(app_data_ptr a, synode_no start)
1493 {
1494 if (a) {
1495 site_def *site = create_site_def_with_start(a, start);
1496 site_install_action(site, a->body.c_t);
1497 return site;
1498 }
1499 return 0;
1500 }
1501
1502
install_node_group(app_data_ptr a)1503 site_def *install_node_group(app_data_ptr a)
1504 {
1505 ADD_EVENTS(
1506 add_event(string_arg("a->app_key"));
1507 add_synode_event(a->app_key);
1508 );
1509 if (a)
1510 return install_ng_with_start(a, getstart(a));
1511 else
1512 return 0;
1513 }
1514
1515 /* purecov: begin deadcode */
is_real_recover(app_data_ptr a)1516 int is_real_recover(app_data_ptr a)
1517 {
1518 return a && a->body.c_t == xcom_recover && a->body.app_u_u.rep.msg_list.synode_no_array_len > 0;
1519 }
1520 /* purecov: end */
1521
set_max_synode(synode_no synode)1522 void set_max_synode(synode_no synode)
1523 {
1524 max_synode = synode; /* Track max synode number */
1525 MAY_DBG(FN; STRLIT("new "); SYCEXP(max_synode));
1526 }
1527
1528 /* purecov: begin deadcode */
learn_accepted_value(site_def const * site,pax_msg * p,synode_no synode)1529 static void learn_accepted_value(site_def const * site, pax_msg *p, synode_no synode)
1530 {
1531 pax_msg * msg = pax_msg_new(synode, site);
1532 ref_msg(msg);
1533 copy_app_data(&msg->a, p->a);
1534 msg->start_type = p->start_type;
1535 set_learn_type(msg);
1536 MAY_DBG(FN; STRLIT("trying to learn known value "); SYCEXP(synode));
1537 send_to_all_site(site, msg, "learn_accepted_value");
1538 unref_msg(&msg);
1539 }
1540 /* purecov: end */
1541
is_busy(synode_no s)1542 static int is_busy(synode_no s)
1543 {
1544 pax_machine * p = hash_get(s);
1545 if (!p) {
1546 return 0;
1547 } else {
1548 return started(p);
1549 }
1550 }
1551
1552 #if 0
1553 static synode_no find_slot(synode_no msgno, site_def **site)
1554 {
1555 assert(!synode_eq(msgno, null_synode));
1556 while (is_busy(msgno)) {
1557 msgno = incr_msgno(msgno);
1558 }
1559 assert(!synode_eq(msgno, null_synode));
1560 *site = find_site_def_rw(msgno);
1561 return msgno;
1562 }
1563 #endif
1564
match_my_msg(pax_msg * learned,pax_msg * mine)1565 bool_t match_my_msg(pax_msg *learned, pax_msg *mine)
1566 {
1567 MAY_DBG(FN; PTREXP(learned->a);
1568 if (learned->a)
1569 SYCEXP(learned->a->unique_id);
1570 PTREXP(mine->a);
1571 if (mine->a)
1572 SYCEXP(mine->a->unique_id);
1573 ) ;
1574 if (learned->a && mine->a) { /* Both have app data, see if data is mine */
1575 return synode_eq(learned->a->unique_id, mine->a->unique_id);
1576 } else if (!(learned->a || mine->a)) { /* None have app data, anything goes */
1577 return TRUE;
1578 } else { /* Definitely mismatch */
1579 return FALSE;
1580 }
1581 }
1582
1583
1584 #if TASK_DBUG_ON
dbg_reply_set(site_def const * site,const char * s,bit_set * bs)1585 static void dbg_reply_set(site_def const * site, const char *s, bit_set *bs)
1586 {
1587 unsigned int i = 0;
1588 unsigned int n = get_maxnodes(site);
1589 GET_GOUT;
1590 STRLIT(s);
1591 for (i = 0; i < n && i < bs->bits.bits_len * sizeof(*bs->bits.bits_val) * BITS_PER_BYTE; i++) {
1592 NPUT(BIT_ISSET(i, bs), d);
1593 }
1594 PRINT_GOUT;
1595 FREE_GOUT;
1596 }
1597 #endif
1598
1599
1600 static void propose_noop(synode_no find, pax_machine *p);
1601
too_far(synode_no s)1602 static inline int too_far(synode_no s)
1603 {
1604 return s.msgno >= executed_msg.msgno + event_horizon;
1605 }
1606
1607 #define GOTO(x) {DBGOUT(STRLIT("goto "); STRLIT(#x)); goto x; }
1608
is_view(cargo_type x)1609 static inline int is_view(cargo_type x)
1610 {
1611 return x == view_msg;
1612 }
1613
is_config(cargo_type x)1614 static inline int is_config(cargo_type x)
1615 {
1616 return x == unified_boot_type ||
1617 x == add_node_type ||
1618 x == remove_node_type ||
1619 x == force_config_type;
1620 }
1621
1622 static void terminate_and_exit();
1623
1624 /* Send messages by fetching from the input queue and trying to get it accepted
1625 by a Paxos instance */
proposer_task(task_arg arg)1626 static int proposer_task(task_arg arg)
1627 {
1628 DECL_ENV
1629 int self; /* ID of this proposer task */
1630 pax_machine * p; /* Pointer to Paxos instance */
1631 msg_link * client_msg; /* The client message we are trying to push */
1632 synode_no msgno;
1633 pax_msg * prepare_msg;
1634 double start_propose;
1635 double start_push;
1636 double delay;
1637 site_def const *site;
1638 size_t size;
1639 END_ENV;
1640
1641 TASK_BEGIN
1642
1643 ep->self = get_int_arg(arg);
1644 ep->p = NULL;
1645 ep->client_msg = NULL;
1646 ep->prepare_msg = NULL;
1647 ep->start_propose = 0.0;
1648 ep->start_push = 0.0;
1649 ep->delay = 0.0;
1650 ep->msgno = current_message;
1651 ep->site = 0;
1652 ep->size = 0;
1653
1654 MAY_DBG(FN; NDBG(ep->self, d); NDBG(task_now(), f));
1655
1656 while (!xcom_shutdown) { /* Loop until no more work to do */
1657 int MY_ATTRIBUTE((unused)) lock = 0;
1658 /* Wait for client message */
1659 assert(!ep->client_msg);
1660 CHANNEL_GET(&prop_input_queue, &ep->client_msg, msg_link);
1661 MAY_DBG(FN; PTREXP(ep->client_msg->p->a); STRLIT("extracted "); SYCEXP(ep->client_msg->p->a->app_key));
1662
1663 /* Grab rest of messages in queue as well, but never batch config messages, which need a unique number */
1664
1665 if(!is_config(ep->client_msg->p->a->body.c_t) && !is_view(ep->client_msg->p->a->body.c_t)){
1666 ep->size = app_data_size(ep->client_msg->p->a);
1667 while(AUTOBATCH && ep->size <= MAX_BATCH_SIZE &&
1668 ! link_empty(&prop_input_queue.data)){ /* Batch payloads into single message */
1669 msg_link *tmp;
1670 app_data_ptr atmp;
1671
1672 CHANNEL_GET(&prop_input_queue, &tmp, msg_link);
1673 atmp = tmp->p->a;
1674 ep->size += app_data_size(atmp);
1675 /* Abort batching if config or too big batch */
1676 if(is_config(atmp->body.c_t) || is_view(atmp->body.c_t) ||
1677 ep->size > MAX_BATCH_SIZE){
1678 channel_put_front(&prop_input_queue, &tmp->l);
1679 break;
1680 }
1681 ADD_T_EV(seconds(),__FILE__, __LINE__, "batching");
1682
1683 tmp->p->a = 0; /* Steal this payload */
1684 msg_link_delete(&tmp); /* Get rid of the empty message */
1685 atmp->next = ep->client_msg->p->a; /* Add to list of app_data */
1686 G_TRACE("Batching %s %s", cargo_type_to_str(ep->client_msg->p->a->body.c_t),
1687 cargo_type_to_str(atmp->body.c_t));
1688 ep->client_msg->p->a = atmp;
1689 MAY_DBG(FN; PTREXP(ep->client_msg->p->a); STRLIT("extracted "); SYCEXP(ep->client_msg->p->a->app_key));
1690 }
1691 }
1692 ep->start_propose = task_now();
1693 ep->delay = 0.0;
1694
1695
1696 assert(!(AUTOBATCH && ep->client_msg->p->a->chosen));
1697
1698 /* See if value is known already (old message) */
1699 if (ep->client_msg->p->a->chosen) {
1700 DBGOUT(FN; PTREXP(ep->client_msg->p->a); STRLIT("pushing old "); SYCEXP(ep->client_msg->p->a->app_key));
1701 MAY_DBG(FN;
1702 COPY_AND_FREE_GOUT(dbg_pax_msg(ep->client_msg->p));
1703 );
1704 ep->msgno = ep->client_msg->p->a->app_key;
1705 ep->site = find_site_def(ep->msgno);
1706 if(!ep->site) /* Use current site if message is too old */
1707 ep->site = get_site_def();
1708
1709 /* See if we can do anything with this message */
1710 #if 0
1711 if (!ep->site || get_nodeno(ep->site) == VOID_NODE_NO) {
1712 /* Give up */
1713 deliver_to_app(NULL, ep->client_msg->p->a, delivery_failure);
1714 GOTO(next);
1715 }
1716 #endif
1717 retry_old:
1718 ep->p = get_cache(ep->msgno);
1719 assert(ep->p);
1720 lock = lock_pax_machine(ep->p);
1721 assert(!lock);
1722
1723 /* Try to get a value accepted */
1724 learn_accepted_value(ep->site, ep->client_msg->p, ep->msgno);
1725 while (!finished(ep->p)) {
1726 /* Sleep here if value is not already chosen */
1727 TIMED_TASK_WAIT(&ep->p->rv, ep->delay = wakeup_delay(ep->delay));
1728 if (!synode_eq(ep->msgno, ep->p->synode)) {
1729 DBGOUT(FN; STRLIT("proposer_task detected stolen state machine, retry"); );
1730 /* unlock_pax_machine(ep->p); */
1731 GOTO(retry_old);
1732 }
1733 assert(synode_eq(ep->msgno, ep->p->synode));
1734 learn_accepted_value(ep->site, ep->client_msg->p, ep->msgno);
1735 }
1736 unlock_pax_machine(ep->p);
1737 msg_link_delete(&ep->client_msg);
1738 continue;
1739 }
1740
1741 /* It is a new message */
1742
1743 assert(!synode_eq(current_message, null_synode));
1744
1745 retry_new:
1746 /* Find a free slot */
1747
1748 assert(!synode_eq(current_message, null_synode));
1749 ep->msgno = current_message;
1750 while (is_busy(ep->msgno)) {
1751 while (/* ! ep->client_msg->p->force_delivery && */ too_far(incr_msgno(ep->msgno))) { /* Too far ahead of executor */
1752 TIMED_TASK_WAIT(&exec_wait, 1.0);
1753 DBGOUT(FN; TIMECEXP(ep->start_propose); TIMECEXP(ep->client_msg->p->a->expiry_time); TIMECEXP(task_now());
1754
1755 NDBG(enough_live_nodes(ep->site), d));
1756 }
1757 ep->msgno = incr_msgno(ep->msgno);
1758 }
1759 assert(!synode_eq(ep->msgno, null_synode));
1760 proposer_site = find_site_def_rw(ep->msgno);
1761
1762 ep->site = proposer_site;
1763
1764 /* See if we can do anything with this message */
1765 #if 1
1766 if (!ep->site || get_nodeno(ep->site) == VOID_NODE_NO) {
1767 /* Give up */
1768 deliver_to_app(NULL, ep->client_msg->p->a, delivery_failure);
1769 GOTO(next);
1770 }
1771 #endif
1772 DBGOHK(FN; STRLIT("changing current message"));
1773 set_current_message(ep->msgno);
1774
1775 brand_client_msg(ep->client_msg->p, ep->msgno);
1776 ep->client_msg->p->a->lsn = ep->msgno.msgno;
1777
1778 for(;;) { /* Loop until the client message has been learned */
1779 /* Get a Paxos instance to send the client message */
1780 ep->p = get_cache(ep->msgno);
1781 assert(ep->p);
1782 if(ep->client_msg->p->force_delivery)
1783 ep->p->force_delivery = ep->client_msg->p->force_delivery;
1784 lock = lock_pax_machine(ep->p);
1785 assert(!lock);
1786
1787 /* Set the client message as current proposal */
1788 assert(ep->client_msg->p);
1789 replace_pax_msg(&ep->p->proposer.msg, clone_pax_msg(ep->client_msg->p));
1790 if (ep->p->proposer.msg == NULL) {
1791 g_critical("Node %u has run out of memory while sending a message and "
1792 "will now exit.",
1793 get_nodeno(proposer_site));
1794 terminate_and_exit();
1795 TERMINATE;
1796 }
1797 assert(ep->p->proposer.msg);
1798 PAX_MSG_SANITY_CHECK(ep->p->proposer.msg);
1799
1800 /* Create the prepare message */
1801 unchecked_replace_pax_msg(&ep->prepare_msg,
1802 pax_msg_new(ep->msgno, ep->site));
1803 DBGOUT(FN; PTREXP(ep->client_msg->p->a); STRLIT("pushing "); SYCEXP(ep->msgno));
1804 MAY_DBG(FN; COPY_AND_FREE_GOUT(dbg_app_data(ep->prepare_msg->a)));
1805
1806 if(threephase || ep->p->force_delivery){
1807 push_msg_3p(ep->site, ep->p, ep->prepare_msg, ep->msgno, normal);
1808 }else{
1809 push_msg_2p(ep->site, ep->p);
1810 }
1811
1812 ep->start_push = task_now();
1813
1814 while (!finished(ep->p)) { /* Try to get a value accepted */
1815 /* We will wake up periodically, and whenever a message arrives */
1816 TIMED_TASK_WAIT(&ep->p->rv, ep->delay = wakeup_delay(ep->delay));
1817 if (!synode_eq(ep->msgno, ep->p->synode) || ep->p->proposer.msg == NULL) {
1818 DBGOHK(FN; STRLIT("detected stolen state machine, retry"); );
1819 /* unlock_pax_machine(ep->p); */
1820 GOTO(retry_new); /* Need to break out of both loops,
1821 and we have no "exit named loop" construction */
1822 }
1823 assert(synode_eq(ep->msgno, ep->p->synode) && ep->p->proposer.msg);
1824 if (finished(ep->p))
1825 break;
1826 {
1827 double now = task_now();
1828 if ((ep->start_push + ep->delay) <= now) {
1829 PAX_MSG_SANITY_CHECK(ep->p->proposer.msg);
1830 DBGOUT(FN; STRLIT("retry pushing "); SYCEXP(ep->msgno));
1831 MAY_DBG(FN; COPY_AND_FREE_GOUT(dbg_app_data(ep->prepare_msg->a));
1832 );
1833 DBGOUT(BALCEXP(ep->p->proposer.bal);
1834 BALCEXP(ep->p->acceptor.promise));
1835 MAY_DBG(FN; dbg_reply_set(ep->site, "prep_node_set", ep->p->proposer.prep_nodeset);
1836 dbg_reply_set(ep->site, "prop_node_set", ep->p->proposer.prop_nodeset);
1837 );
1838 push_msg_3p(ep->site, ep->p, ep->prepare_msg, ep->msgno, normal);
1839 ep->start_push = now;
1840 }
1841 }
1842 }
1843 /* When we get here, we know the value for this message number,
1844 but it may not be the value we tried to push,
1845 so loop until we have a successfull push. */
1846 unlock_pax_machine(ep->p);
1847 MAY_DBG(FN; STRLIT(" found finished message ");
1848 SYCEXP(ep->msgno);
1849 STRLIT("seconds since last push ");
1850 NPUT(task_now() - ep->start_push, f);
1851 STRLIT("ep->client_msg ");
1852 COPY_AND_FREE_GOUT(dbg_pax_msg(ep->client_msg->p));
1853 );
1854 MAY_DBG(FN; STRLIT("ep->p->learner.msg ");
1855 COPY_AND_FREE_GOUT(dbg_pax_msg(ep->p->learner.msg));
1856 );
1857 if (match_my_msg(ep->p->learner.msg, ep->client_msg->p)){
1858 break;
1859 } else
1860 GOTO(retry_new);
1861 }
1862 next:
1863 {
1864 double now = task_now();
1865 double used = now -ep->start_propose;
1866 add_to_filter(used);
1867 DBGOUT(FN; STRLIT("completed ep->msgno ");
1868 SYCEXP(ep->msgno); NDBG(used, f); NDBG(median_time(), f);
1869 STRLIT("seconds since last push "); NDBG(now - ep->start_push, f); );
1870 MAY_DBG(FN; STRLIT("ep->client_msg ");
1871 COPY_AND_FREE_GOUT(dbg_pax_msg(ep->client_msg->p));
1872 );
1873 if (ep->p) {
1874 MAY_DBG(FN; STRLIT("ep->p->learner.msg ");
1875 COPY_AND_FREE_GOUT(dbg_pax_msg(ep->p->learner.msg));
1876 );
1877 }
1878 msg_link_delete(&ep->client_msg);
1879 }
1880 }
1881 FINALLY
1882 MAY_DBG(FN; STRLIT("exit "); NDBG(ep->self, d); NDBG(task_now(), f));
1883 if (ep->p)
1884 unlock_pax_machine(ep->p);
1885 replace_pax_msg(&ep->prepare_msg, NULL);
1886 if (ep->client_msg) { /* If we get here with a client message, we have failed to deliver */
1887 deliver_to_app(ep->p, ep->client_msg->p->a, delivery_failure);
1888 msg_link_delete(&ep->client_msg);
1889
1890 }
1891 TASK_END;
1892 }
1893
1894
1895 /* }}} */
1896
1897
1898 /* {{{ Executor task */
1899
leader(site_def const * s)1900 static node_no leader(site_def const *s)
1901 {
1902 node_no leader = 0;
1903 for (leader = 0; leader < get_maxnodes(s); leader++) {
1904 if (!may_be_dead(s->detected, leader, task_now()))
1905 return leader;
1906 }
1907 return 0;
1908 }
1909
1910
iamthegreatest(site_def const * s)1911 int iamthegreatest(site_def const *s)
1912 {
1913 return leader(s) == s->nodeno;
1914 }
1915
1916
execute_msg(site_def const * site,pax_machine * pma,pax_msg * p)1917 void execute_msg(site_def const * site, pax_machine *pma, pax_msg *p)
1918 {
1919 app_data_ptr a = p->a;
1920 DBGOUT(FN;
1921 COPY_AND_FREE_GOUT(dbg_pax_msg(p));
1922 );
1923 if (a) {
1924 switch (p->a->body.c_t) {
1925 case unified_boot_type:
1926 case add_node_type:
1927 case remove_node_type:
1928 case force_config_type:
1929
1930 check_tasks();
1931 break;
1932 case xcom_recover:
1933 /* purecov: begin deadcode */
1934 break;
1935 /* purecov: end */
1936 case app_type:
1937 MAY_DBG(FN; STRLIT(" learner.msg ");
1938 COPY_AND_FREE_GOUT(dbg_pax_msg(pma->learner.msg));
1939 );
1940 deliver_to_app(pma, a, delivery_ok);
1941 break;
1942 case view_msg:
1943 MAY_DBG(FN;
1944 STRLIT(" learner.msg ");
1945 COPY_AND_FREE_GOUT(dbg_pax_msg(pma->learner.msg)); );
1946 if(site && site->global_node_set.node_set_len == a->body.app_u_u.present.node_set_len){
1947 assert(site->global_node_set.node_set_len == a->body.app_u_u.present.node_set_len);
1948 copy_node_set(&a->body.app_u_u.present, &(((site_def *)site)->global_node_set));
1949 deliver_global_view_msg(site, p->synode);
1950 }
1951 break;
1952 #ifdef USE_EXIT_TYPE
1953 case exit_type:
1954 g_critical("Unable to get message, process will now exit. Please ensure that the process is restarted");
1955 exit(1);
1956 break;
1957 #endif
1958 default:
1959 break;
1960 }
1961 }
1962 MAY_DBG(FN; SYCEXP(p->synode));
1963 }
1964
1965
1966
1967 static void read_missing_values(int n);
1968 static void propose_missing_values(int n);
1969
find_value(site_def const * site,unsigned int * wait,int n)1970 static void find_value(site_def const *site, unsigned int *wait, int n)
1971 {
1972 DBGOHK(FN; NDBG(*wait, d));
1973
1974 if(get_nodeno(site) == VOID_NODE_NO){
1975 read_missing_values(n);
1976 return;
1977 }
1978
1979 switch (*wait) {
1980 case 0:
1981 case 1:
1982 read_missing_values(n);
1983 (*wait)++;
1984 break;
1985 case 2:
1986 if (iamthegreatest(site))
1987 propose_missing_values(n);
1988 else
1989 read_missing_values(n);
1990 (*wait)++;
1991 break;
1992 case 3:
1993 propose_missing_values(n);
1994 break;
1995 default:
1996 break;
1997 }
1998 }
1999
get_xcom_message(pax_machine ** p,synode_no msgno,int n)2000 int get_xcom_message(pax_machine **p, synode_no msgno, int n)
2001 {
2002 DECL_ENV
2003 unsigned int wait;
2004 double delay;
2005 END_ENV;
2006
2007 TASK_BEGIN
2008
2009 ep->wait = 0;
2010 ep->delay = 0.0;
2011 *p = get_cache(msgno);
2012
2013 while (!finished(*p)) {
2014 site_def const * site = find_site_def(msgno);
2015 DBGOHK(FN;
2016 STRLIT(" not finished ");
2017 SYCEXP(msgno); PTREXP(*p);
2018 NDBG(ep->wait, u);
2019 SYCEXP(msgno));
2020 if (get_maxnodes(site) > 1 && iamthegreatest(site) &&
2021 site->global_node_set.node_set_val &&
2022 !site->global_node_set.node_set_val[msgno.node] &&
2023 may_be_dead(site->detected, msgno.node, task_now())){
2024 propose_missing_values(n);
2025 } else {
2026 find_value(site, &ep->wait, n);
2027 }
2028 TIMED_TASK_WAIT(&(*p)->rv, ep->delay = wakeup_delay(ep->delay));
2029 *p = get_cache(msgno);
2030 }
2031
2032 FINALLY
2033 DBGOHK(FN; SYCEXP(msgno); PTREXP(*p); NDBG(ep->wait, u); SYCEXP(msgno));
2034 TASK_END;
2035 }
2036
set_executed_msg(synode_no msgno)2037 synode_no set_executed_msg(synode_no msgno)
2038 {
2039 DBGOUT(FN; STRLIT("changing executed_msg from "); SYCEXP(executed_msg); STRLIT(" to "); SYCEXP(msgno));
2040 if (synode_gt(msgno, current_message)) {
2041 DBGOHK(FN; STRLIT("changing current message"));
2042 set_current_message(first_free_synode(msgno));
2043 }
2044
2045 if (msgno.msgno > executed_msg.msgno)
2046 task_wakeup(&exec_wait);
2047
2048 executed_msg = msgno;
2049 executor_site = find_site_def(executed_msg);
2050 return executed_msg;
2051 }
2052
2053
first_free_synode(synode_no msgno)2054 static synode_no first_free_synode(synode_no msgno)
2055 {
2056 site_def const * site = find_site_def(msgno);
2057 synode_no retval = msgno;
2058 if(get_group_id(site) == 0){
2059 DBGOUT(FN; PTREXP(site); SYCEXP(msgno));
2060 if(site){
2061 DBGOUT(FN; SYCEXP(site->boot_key); SYCEXP(site->start); COPY_AND_FREE_GOUT(dbg_site_def(site)));
2062 }
2063 }
2064 assert(get_group_id(site) != 0);
2065 assert(!synode_eq(msgno, null_synode));
2066 if (retval.msgno == 0)
2067 retval.msgno = 1;
2068 retval.node = get_nodeno(site);
2069 if (synode_lt(retval, msgno))
2070 return incr_msgno(retval);
2071 else
2072 return retval;
2073 }
2074
2075
2076
set_current_message(synode_no msgno)2077 synode_no set_current_message(synode_no msgno)
2078 {
2079 MAY_DBG(FN; STRLIT("changing current_message from "); SYCEXP(current_message); STRLIT(" to "); SYCEXP(msgno));
2080 return current_message = msgno;
2081 }
2082
2083
2084 static void handle_learn(site_def const * site, pax_machine *p, pax_msg *m);
2085 static void update_max_synode(pax_msg *p);
2086
2087 #if TASK_DBUG_ON
2088 static void perf_dbg(int *_n, int *_old_n, double *_old_t) MY_ATTRIBUTE((unused));
perf_dbg(int * _n,int * _old_n,double * _old_t)2089 static void perf_dbg(int *_n, int *_old_n, double *_old_t)
2090 {
2091 int n = *_n;
2092 int old_n = *_old_n;
2093 double old_t = *_old_t;
2094
2095 DBGOHK(FN; SYCEXP(executed_msg));
2096 if (!(n % 5000)) {
2097 GET_GOUT;
2098 NDBG(get_nodeno(get_site_def()), u);
2099 NDBG(task_now(), f);
2100 NDBG(n, d);
2101 NDBG(median_time(), f);
2102 SYCEXP(executed_msg);
2103 PRINT_GOUT;
2104 FREE_GOUT;
2105 }
2106 (*_n)++;
2107 if (task_now() - old_t > 1.0) {
2108 GET_GOUT;
2109 NDBG(get_nodeno(get_site_def()), u);
2110 NDBG(task_now(), f);
2111 NDBG(n, d);
2112 NDBG((n - old_n) / (task_now() - old_t), f);
2113 PRINT_GOUT;
2114 FREE_GOUT;
2115 *_old_t = task_now();
2116 *_old_n = n;
2117 }
2118 }
2119 #endif
2120
2121 #ifdef IGNORE_LOSERS
2122
LOSER(synode_no x,site_def const * site)2123 static inline int LOSER(synode_no x, site_def const *site)
2124 {
2125 /* node_no i = 0;
2126 node_no n = 0;
2127 node_no maxnodes = get_maxnodes(site);
2128
2129 if (maxnodes == 0)
2130 return 0;
2131
2132 for (i = 0; i < maxnodes; i++) {
2133 if (site->global_node_set.node_set_val[i]) {
2134 n++;
2135 }
2136 }
2137 DBGOUT(NEXP(maxnodes,u); NEXP(n,u)); */
2138 DBGOUT(NEXP(x.node,u); NEXP(site->global_node_set.node_set_val[(x).node],d));
2139 return
2140 /* ( n > maxnodes / 2 || (ARBITRATOR_HACK && (2 == maxnodes && 0 == get_nodeno(site)))) && */
2141 (!(site)->global_node_set.node_set_val[(x).node] );
2142 }
2143
2144 #else
2145 #define LOSER(x, site) 0
2146 #endif
2147
2148 static void debug_loser(synode_no x) MY_ATTRIBUTE((unused));
2149 #if defined(TASK_DBUG_ON) && TASK_DBUG_ON
debug_loser(synode_no x)2150 static void debug_loser(synode_no x)
2151 {
2152 if (1 || x.msgno < 10) {
2153 GET_GOUT;
2154 NDBG(get_nodeno(find_site_def(x)), u);
2155 STRLIT(" ignoring loser ");
2156 SYCEXP(x);
2157 SYCEXP(max_synode);
2158 PRINT_GOUT;
2159 FREE_GOUT;
2160 }
2161 }
2162 #else
2163 /* purecov: begin deadcode */
debug_loser(synode_no x MY_ATTRIBUTE ((unused)))2164 static void debug_loser(synode_no x MY_ATTRIBUTE((unused)))
2165 {
2166 }
2167 /* purecov: end */
2168 #endif
2169
2170 /* #define DBGFIX2(x){ GET_GOUT; ADD_F_GOUT("%f ",task_now()); x; PRINT_GOUT; FREE_GOUT; } */
2171 #define DBGFIX2(x)
send_value(site_def const * site,node_no to,synode_no synode)2172 static void send_value(site_def const *site, node_no to, synode_no synode)
2173 {
2174 pax_machine * pm = get_cache(synode);
2175 if (pm && pm->learner.msg) {
2176 pax_msg * msg = clone_pax_msg(pm->learner.msg);
2177 if (msg == NULL) return;
2178 ref_msg(msg);
2179 send_server_msg(site, to, msg);
2180 unref_msg(&msg);
2181 }
2182 }
2183
2184 /* Peturn message number where it is safe for nodes in prev config to exit */
compute_delay(synode_no start)2185 static synode_no compute_delay(synode_no start)
2186 {
2187 start.msgno += event_horizon;
2188 return start;
2189 }
2190
2191 /* Push messages to all nodes which were in the previous site, but not in this */
inform_removed(int index,int all)2192 static void inform_removed(int index,int all)
2193 {
2194 site_def * *sites = 0;
2195 uint32_t site_count = 0;
2196 DBGFIX2(FN; NEXP(index, d));
2197 get_all_site_defs(&sites, &site_count);
2198 while (site_count > 1 && index >= 0 && (uint32_t)(index + 1) < site_count) {
2199 site_def * s = sites[index];
2200 site_def * ps = sites[index+1];
2201
2202 /* Compute diff and push messages */
2203 DBGFIX2(FN; NDBG(index,d); PTREXP(s); if(s)SYCEXP(s->boot_key); PTREXP(ps); if(ps)SYCEXP(ps->boot_key));
2204
2205 if (s && ps) {
2206 node_no i = 0;
2207 DBGFIX2(FN; SYCEXP(s->boot_key); SYCEXP(s->start);
2208 SYCEXP(ps->boot_key); SYCEXP(ps->start));
2209 for (i = 0; i < ps->nodes.node_list_len; i++) { /* Loop over prev site */
2210 if (ps->nodeno != i && !node_exists(&ps->nodes.node_list_val[i], &s->nodes)) {
2211 synode_no synode = s->start;
2212 synode_no end = compute_delay(s->start);
2213 while (!synode_gt(synode, end)) { /* Loop over relevant messages */
2214 send_value(ps, i, synode);
2215 synode = incr_synode(synode);
2216 }
2217 }
2218 }
2219 }
2220 if(! all) /* Early exit if not all configs should be examined */
2221 break;
2222 index--;
2223 }
2224 }
2225
handle_add_node(app_data_ptr a)2226 site_def *handle_add_node(app_data_ptr a)
2227 {
2228 site_def * site = clone_site_def(get_site_def());
2229 DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(&a->body.app_u_u.nodes)); );
2230 MAY_DBG(FN; COPY_AND_FREE_GOUT(dbg_list(&a->body.app_u_u.nodes)); );
2231 ADD_EVENTS(
2232 add_event(string_arg("a->app_key"));
2233 add_synode_event(a->app_key);
2234 );
2235 assert(get_site_def());
2236 assert(site);
2237 add_site_def(a->body.app_u_u.nodes.node_list_len,
2238 a->body.app_u_u.nodes.node_list_val, site);
2239 site->start = getstart(a);
2240 site->boot_key = a->app_key;
2241 site_install_action(site, a->body.c_t);
2242 return site;
2243 }
2244
terminate_and_exit()2245 static void terminate_and_exit()
2246 {
2247 XCOM_FSM(xa_terminate, int_arg(0)); /* Tell xcom to stop */
2248 XCOM_FSM(xa_exit, int_arg(0)); /* Tell xcom to exit */
2249 if (xcom_expel_cb) xcom_expel_cb(0);
2250 }
2251
terminator_task(task_arg arg)2252 int terminator_task(task_arg arg)
2253 {
2254 DECL_ENV
2255 double t;
2256 END_ENV;
2257
2258 TASK_BEGIN
2259
2260 ep->t = get_double_arg(arg);
2261 TASK_DELAY(ep->t);
2262 terminate_and_exit();
2263 FINALLY
2264 TASK_END;
2265 }
2266
delayed_terminate_and_exit(double t)2267 static void delayed_terminate_and_exit(double t)
2268 {
2269 task_new(terminator_task, double_arg(t), "terminator_task", XCOM_THREAD_DEBUG);
2270 }
2271
is_empty_site(site_def const * s)2272 static inline int is_empty_site(site_def const *s)
2273 {
2274 return s->nodes.node_list_len == 0;
2275 }
2276
handle_remove_node(app_data_ptr a)2277 site_def *handle_remove_node(app_data_ptr a)
2278 {
2279 site_def * site = clone_site_def(get_site_def());
2280 DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(&a->body.app_u_u.nodes)));
2281 ADD_EVENTS(
2282 add_event(string_arg("a->app_key"));
2283 add_synode_event(a->app_key);
2284 add_event(string_arg("nodeno"));
2285 add_event(uint_arg(get_nodeno(site)));
2286 );
2287
2288 remove_site_def(a->body.app_u_u.nodes.node_list_len,
2289 a->body.app_u_u.nodes.node_list_val, site);
2290 site->start = getstart(a);
2291 site->boot_key = a->app_key;
2292 site_install_action(site, a->body.c_t);
2293 return site;
2294 }
2295
handle_config(app_data_ptr a)2296 void handle_config(app_data_ptr a)
2297 {
2298 while(a){
2299 switch (a->body.c_t) {
2300 case unified_boot_type:
2301 install_node_group(a);
2302 break;
2303 case add_node_type:
2304 handle_add_node(a);
2305 break;
2306 case remove_node_type:
2307 handle_remove_node(a);
2308 if(xcom_shutdown)
2309 return;
2310 break;
2311 case force_config_type:
2312 install_node_group(a);
2313 break;
2314 default:
2315 break;
2316 }
2317 a = a->next;
2318 }
2319 }
2320
2321 enum exec_state {
2322 FETCH = 0,
2323 EXECUTE = 1
2324 };
2325 typedef enum exec_state exec_state;
2326
2327 #define NEXTSTATE(x) ep->state = (x)
2328
2329 static synode_no delivered_msg;
2330
get_delivered_msg()2331 synode_no get_delivered_msg()
2332 {
2333 return delivered_msg;
2334 }
2335
is_member(site_def const * site)2336 static inline int is_member(site_def const *site)
2337 {
2338 return site->nodeno != VOID_NODE_NO;
2339 }
2340
2341 /*
2342 Execute xcom message stream.
2343
2344 Beware of the exit logic in this task, which is both simple and not so simple.
2345 Consider three configs C1 and C2. C1 has two nodes, A and B. C2 has only node B.
2346 C3 is empty.
2347 A config with message number N will be activated after a delay of (at least)
2348 alpha messages, where alpha is the size of the pipeline (or the event horizon).
2349
2350 So, C1.start = C1+alpha,
2351 and C2.start = C2+alpha. A, which is removed from C1, cannot exit until a majority
2352 of nodes in the new config C2 (in this case B) has learned all the messages from
2353 config C1, which means all messages less than C2.start. How can A know that a majority
2354 of C2 has learned those messages?
2355
2356 If we denote the first message that is not yet decided (and executed) by E,
2357 the proposers will not try to propose messages with number >= E+alpha,
2358 and all incoming tcp messages with message number >= E+alpha will be ignored.
2359 E is incremented by the executor task, so all messages < E are known.
2360 This means that when the value of E+alpha is known, all messages up to
2361 and including E are also known, although not all messages E+1..E+alpha-1
2362 necessarily are known.
2363
2364 This leads to the requirement that a node which is removed (A) needs to wait until
2365 it knows the value of C2.start+alpha, since by then it knows that a majority
2366 of the nodes in C2 are ready to execute C2.start, which in turn implies that
2367 a majority of nodes in C2 knows all the values from config C1. Note that the last
2368 message that should be delivered to the application by a node that is leaving C1 is
2369 C2.start-1, which is the last message of C1.
2370
2371 How does a node that is removed get to know values from the next config?
2372 There are two ways, and we use both. First, the node that tries to exit can
2373 simply ask for the message. get_xcom_message() will do this for all messages
2374 <= max_synode, but it may take some time.
2375 Second, the nodes of C2 can send the messages C2.start..C2.start+alpha
2376 to the nodes that are removed (nodes that are in C1 but not in C2).
2377 inform_removed() does this. We take care to handle the case where configs are close enough
2378 that C0 < C1 <= C0+alpha by tracking the oldest config that contains nodes that are
2379 leaving.
2380
2381 This takes care of nodes leaving C1. What about nodes that leave C2? C3 is empty,
2382 so B, which is leaving C2, cannot wait for messages from C3. But since C3 is empty,
2383 there is no need to wait. It can exit immediately after having executed C3.start-1, the
2384 last message of C2. What if C3.start-1 < C2.start+alpha? This can happen if C2 and C3
2385 are close. In that case, B will exit before A gets the chance to learn C2.start+alpha,
2386 which will leave A hanging forever. Clearly, we need to impose an additional constraint,
2387 that C3.start must be greater than C2.start+alpha. This is taken care of by the special
2388 test for an empty config.
2389
2390 Complicated and confusing? Not really, but there is a clean and simple solution which has
2391 not been implemented yet, since it requires more changes to the consensus logic.
2392 If we require that for the messages C2..C2.start-1 we have a majority from both the nodes
2393 in C1 and the nodes in C2, the nodes not in C2 can exit when they have executed message
2394 C2.start-1, since we then know that a majority of the nodes of C2 has agreed on those messages
2395 as well, so they do not depend on the nodes not in C2 any more. This holds even if C2 is empty.
2396 Note that requiring a majority from both C1 and C2 is different from requiring a majority from
2397 C1+C2, which means that the proposer logic needs to consider answers from two different sets of
2398 acceptors for those messages. Since acceptors are identified by their node number, and the node
2399 numbers need not be the same for both configs, we need to maintain a mapping between the nodes
2400 numbers of any two consecutive configs. Alternatively, we could remove the node numbers altogether,
2401 and always use a unique, unchanging ID for a node, like IP address + port.
2402
2403 */
2404
2405 /* FIFO which tracks the message numbers where we should deliver queued messages or
2406 inform the removed nodes */
2407 #define FIFO_SIZE 1000
2408 static struct {
2409 int n;
2410 int front;
2411 int rear;
2412 synode_no q[FIFO_SIZE];
2413 }delay_fifo;
2414
addone(int i)2415 static inline int addone(int i)
2416 {
2417 return ((i + 1) % FIFO_SIZE);
2418 }
2419
2420 /* Is queue empty? */
fifo_empty()2421 static inline int fifo_empty()
2422 {
2423 return delay_fifo.n <= 0;
2424 }
2425
2426 /* Is queue full? */
fifo_full()2427 static inline int fifo_full()
2428 {
2429 return delay_fifo.n >= FIFO_SIZE;
2430 }
2431
2432
2433 /* Insert in queue */
fifo_insert(synode_no s)2434 static inline void fifo_insert(synode_no s)
2435 {
2436 if(! fifo_full()){
2437 delay_fifo.n++;
2438 delay_fifo.q[delay_fifo.rear] = s;
2439 delay_fifo.rear = addone(delay_fifo.rear);
2440 }
2441 }
2442
2443 /* Extract first from queue */
fifo_extract()2444 static inline synode_no fifo_extract()
2445 {
2446 if(! fifo_empty()){
2447 synode_no ret = delay_fifo.q[delay_fifo.front];
2448 delay_fifo.front = addone(delay_fifo.front);
2449 delay_fifo.n--;
2450 return ret;
2451 }else{
2452 return null_synode;
2453 }
2454 }
2455
2456 /* Return first in queue, but do not dequeue */
fifo_front()2457 static inline synode_no fifo_front()
2458 {
2459 if(! fifo_empty()){
2460 return delay_fifo.q[delay_fifo.front];
2461 }else{
2462 return null_synode;
2463 }
2464 }
2465
executor_task(task_arg arg MY_ATTRIBUTE ((unused)))2466 static int executor_task(task_arg arg MY_ATTRIBUTE((unused)))
2467 {
2468 DECL_ENV
2469 pax_machine * p;
2470 int n ;
2471 int old_n;
2472 double old_t;
2473 synode_no exit_synode;
2474 exec_state state;
2475 enum {
2476 no_exit,
2477 not_member_exit,
2478 empty_exit
2479 } exit_type;
2480 int inform_index;
2481 END_ENV;
2482
2483 TASK_BEGIN
2484 ep->p = NULL;
2485 ep->n = 0;
2486 ep->old_n = 0;
2487 ep->old_t = task_now();
2488 ep->exit_synode = null_synode;
2489 ep->exit_type = no_exit;
2490 ep->inform_index = -1;
2491 delay_fifo.n = 0;
2492 delay_fifo.front = 0;
2493 delay_fifo.rear = 0;
2494
2495 set_last_received_config(null_synode);
2496
2497 if (executed_msg.msgno == 0)
2498 executed_msg.msgno = 1;
2499 delivered_msg = executed_msg;
2500 NEXTSTATE(FETCH);
2501 executor_site = find_site_def(executed_msg);
2502
2503 while (!xcom_shutdown) {
2504 for (; ; ) {
2505 if (ep->state == FETCH) {
2506 if ( !LOSER(executed_msg, executor_site)) {
2507 TASK_CALL(get_xcom_message(&ep->p, executed_msg, FIND_MAX));
2508 DBGOUT(FN; STRLIT("got message "); SYCEXP(ep->p->synode); COPY_AND_FREE_GOUT(dbg_app_data(XAPP)));
2509 /* Execute unified_boot immediately, but do not deliver site message until we */
2510 /* are ready to execute messages from the new site definition. */
2511 /* At that point we can be certain that a majority have learned */
2512 /* everything from the old site. */
2513
2514 if ((XAPP) && is_config((XAPP)->body.c_t) &&
2515 synode_gt(executed_msg, get_site_def()->boot_key)) /* Redo test */
2516 {
2517 site_def * site = 0;
2518 set_last_received_config(executed_msg);
2519 handle_config(XAPP);
2520 garbage_collect_site_defs(delivered_msg);
2521 check_tasks();
2522 site = get_site_def_rw();
2523 if (site == 0) {
2524 TERMINATE;
2525 }
2526 DBGFIX2(FN; STRLIT("new config "); SYCEXP(site->boot_key); ); /*SYCEXP(site->start); NEXP(get_nodeno(site),d); NEXP(ARBITRATOR_HACK,d);
2527 NEXP(ep->exit_type,d); SYCEXP(ep->exit_synode);
2528 SYCEXP(executed_msg); SYCEXP(max_synode)); */
2529
2530 /* If site is empty, increase start to allow nodes to terminate before start */
2531 if (is_empty_site(site)) {
2532 site->start = compute_delay(compute_delay(site->start));
2533 }
2534 if (ep->exit_type == no_exit){/* We have not yet set the exit trigger */
2535 synode_no delay_until;
2536 if(is_member(site)){
2537 delay_until = compute_delay(site->start);
2538 } else { /* Not in this site */
2539 /*
2540 See if site will be empty when we leave.
2541 If the new site is empty, we should exit after having
2542 delivered the last message from the old site.
2543 */
2544 if (is_empty_site(site)) {
2545 ep->exit_synode = decr_synode(site->start);
2546 ep->exit_type = empty_exit;
2547 delay_until = ep->exit_synode;
2548 DBGFIX2(FN; SYCEXP(ep->exit_synode); SYCEXP(executed_msg); SYCEXP(max_synode));
2549 }else{
2550 /*
2551 If we are not a member of the new site, we should exit after having
2552 seen enough messages from the new site.
2553 */
2554 ep->exit_synode = compute_delay(site->start);
2555 ep->exit_type = not_member_exit;
2556 if (!synode_lt(ep->exit_synode, max_synode)){
2557 /* We need messages from the next site, so set max_synode accordingly. */
2558 set_max_synode(incr_synode(ep->exit_synode));
2559 }
2560 delay_until = ep->exit_synode;
2561 DBGFIX2(FN; SYCEXP(delay_until); SYCEXP(executed_msg); SYCEXP(max_synode));
2562 DBGFIX2(FN; SYCEXP(ep->exit_synode); SYCEXP(executed_msg); SYCEXP(max_synode));
2563 }
2564 }
2565
2566 if (synode_gt(delay_until, max_synode))
2567 set_max_synode(delay_until);
2568 fifo_insert(delay_until);
2569 ep->inform_index++;
2570 }
2571 } else {
2572 DBGOUT(FN; SYCEXP(executed_msg); SYCEXP(get_site_def()->boot_key));
2573 }
2574 } else {
2575 DBGOUT(FN; debug_loser(executed_msg); PTREXP(executor_site);
2576 COPY_AND_FREE_GOUT(dbg_node_set(executor_site->global_node_set)));
2577 }
2578 DBGOUT(FN; NDBG(ep->state, d); SYCEXP(delivered_msg); SYCEXP(executed_msg);
2579 SYCEXP(ep->exit_synode); NDBG(ep->exit_type, d));
2580
2581 /* See if we should exit when having seen this message */
2582 if (ep->exit_type == not_member_exit && synode_eq(executed_msg, ep->exit_synode)) {
2583 inform_removed(ep->inform_index, 1); /* Inform all removed nodes before we exit */
2584 delayed_terminate_and_exit(TERMINATE_DELAY); /* Tell xcom to stop */
2585 TERMINATE;
2586 }
2587
2588 if (fifo_empty()) {
2589 NEXTSTATE(EXECUTE);
2590 } else if (synode_eq(executed_msg, fifo_front())) {
2591 DBGFIX2(FN; SYCEXP(fifo_front()); SYCEXP(executed_msg);
2592 SYCEXP(ep->exit_synode); NDBG(ep->exit_type, d));
2593 while(synode_eq(executed_msg, fifo_front())){ /* More than one may match */
2594 inform_removed(ep->inform_index, 0);
2595 fifo_extract();
2596 ep->inform_index--;
2597 }
2598 garbage_collect_servers();
2599 NEXTSTATE(EXECUTE);
2600 }
2601 SET_EXECUTED_MSG(incr_synode(executed_msg));
2602 MAY_DBG(FN; NDBG(ep->state, d); SYCEXP(fifo_front()); SYCEXP(executed_msg));
2603 MAY_DBG(FN; NDBG(ep->state, d); SYCEXP(ep->exit_synode); SYCEXP(executed_msg));
2604 } else if (ep->state == EXECUTE) {
2605 site_def const * x_site = find_site_def(delivered_msg);
2606
2607 DBGOUT(FN; NDBG(ep->state, d); SYCEXP(delivered_msg); SYCEXP(delivered_msg); SYCEXP(executed_msg);
2608 SYCEXP(ep->exit_synode); NDBG(ep->exit_type, d));
2609 ep->p = get_cache(delivered_msg);
2610 ADD_EVENTS(
2611 add_event(string_arg("executing message"));
2612 add_synode_event(ep->p->synode);
2613 );
2614 if (LOSER(delivered_msg, x_site)) {
2615 #ifdef IGNORE_LOSERS
2616 DBGOUT(FN; debug_loser(delivered_msg); PTREXP(x_site); dbg_node_set(x_site->global_node_set));
2617 #endif
2618 } else if ((ep->p)->learner.msg->msg_type != no_op) {
2619 execute_msg(find_site_def(delivered_msg), ep->p, ep->p->learner.msg);
2620 #if defined(TASK_DBUG_ON) && TASK_DBUG_ON
2621 DBGOUT(perf_dbg(&ep->n, &ep->old_n, &ep->old_t));
2622 #endif
2623 }
2624 /* Garbage collect old servers */
2625 if (synode_eq(delivered_msg, x_site->start)) {
2626 garbage_collect_servers();
2627 }
2628 /* See if we should exit when having delivered this message */
2629 if (ep->exit_type == empty_exit && synode_eq(delivered_msg, ep->exit_synode)) {
2630 inform_removed(ep->inform_index, 1); /* Inform all removed nodes before we exit */
2631 delayed_terminate_and_exit(TERMINATE_DELAY); /* Tell xcom to stop */
2632 TERMINATE;
2633 }
2634 delivered_msg = incr_synode(delivered_msg);
2635 if (synode_eq(delivered_msg, executed_msg)) {
2636 NEXTSTATE(FETCH);
2637 }
2638 } else {
2639 abort();
2640 }
2641 }
2642 }
2643 FINALLY
2644 DBGOUT(FN; STRLIT(" shutdown "); SYCEXP(executed_msg); NDBG(task_now(), f));
2645 TASK_END;
2646 }
2647
2648
get_sweep_start()2649 static synode_no get_sweep_start()
2650 {
2651 synode_no find = executed_msg;
2652 find.node = get_nodeno(find_site_def(find));
2653 if (find.node < executed_msg.node) {
2654 find = incr_msgno(find);
2655 }
2656 return find;
2657 }
2658
2659
sweeper_task(task_arg arg MY_ATTRIBUTE ((unused)))2660 static int sweeper_task(task_arg arg MY_ATTRIBUTE((unused)))
2661 {
2662 DECL_ENV
2663 synode_no find;
2664 END_ENV;
2665
2666 TASK_BEGIN
2667
2668 ep->find = get_sweep_start();
2669
2670 while (!xcom_shutdown) {
2671 ep->find.group_id = executed_msg.group_id; /* In case group id has changed */
2672 #ifndef AGGRESSIVE_SWEEP
2673 while (!is_only_task()) {
2674 TASK_YIELD;
2675 }
2676 #endif
2677 ADD_EVENTS(
2678 add_event(string_arg("sweeper ready"));
2679 add_synode_event(executed_msg);
2680 );
2681 /* DBGOUT(FN; STRLIT("ready to run "); */
2682 /* SYCEXP(executed_msg); SYCEXP(max_synode); SYCEXP(ep->find)); */
2683 {
2684 while (synode_lt(ep->find, max_synode) && ! too_far(ep->find)) {
2685 /* pax_machine * pm = hash_get(ep->find); */
2686 pax_machine * pm = 0;
2687 ADD_EVENTS(
2688 add_event(string_arg("sweeper examining"));
2689 add_synode_event(ep->find);
2690 );
2691 DBGOUT(FN; STRLIT("examining "); SYCEXP(ep->find));
2692 if (ep->find.node == VOID_NODE_NO) {
2693 if(synode_gt(executed_msg, ep->find)){
2694 ep->find = get_sweep_start();
2695 }
2696 if (ep->find.node == VOID_NODE_NO)
2697 goto deactivate;
2698 }
2699 pm = get_cache(ep->find);
2700 if (pm && !pm->force_delivery) { /* We want full 3 phase Paxos for forced messages */
2701 /* DBGOUT(FN; dbg_pax_machine(pm)); */
2702 if (!is_busy_machine(pm) && pm->acceptor.promise.cnt == 0 && ! pm->acceptor.msg && !finished(pm)) {
2703 pm->op = skip_op;
2704 ADD_EVENTS(
2705 add_event(string_arg("sweeper skipping"));
2706 add_synode_event(ep->find);
2707 add_event(string_arg(pax_op_to_str(pm->op)));
2708 );
2709 skip_msg(pax_msg_new(ep->find, find_site_def(ep->find)));
2710 MAY_DBG(FN; STRLIT("skipping "); SYCEXP(ep->find));
2711 /* MAY_DBG(FN; dbg_pax_machine(pm)); */
2712 }
2713 }
2714 ep->find = incr_msgno(ep->find);
2715 }
2716 }
2717 deactivate:
2718 TASK_DEACTIVATE;
2719 }
2720 FINALLY
2721 MAY_DBG(FN; STRLIT(" shutdown sweeper "); SYCEXP(executed_msg); NDBG(task_now(), f));
2722 TASK_END;
2723 }
2724
2725
2726 /* }}} */
2727
2728 #if 0
2729 static double wakeup_delay(double old)
2730 {
2731 double retval = 0.0;
2732 if (0.0 == old) {
2733 double m = median_time();
2734 if (m == 0.0 || m > 1.0)
2735 m = 0.1;
2736 retval = 0.1 + 10.0 * m + m * my_drand48();
2737 } else {
2738 retval = old * 1.4142136; /* Exponential backoff */
2739 }
2740 while (retval > 10.0)
2741 retval /= 1.31415926;
2742 /* DBGOUT(FN; NDBG(retval,d)); */
2743 return retval;
2744 }
2745
2746
2747 #else
wakeup_delay(double old)2748 static double wakeup_delay(double old)
2749 {
2750 double retval = 0.0;
2751 if (0.0 == old) {
2752 double m = median_time();
2753 if (m == 0.0 || m > 0.3)
2754 m = 0.1;
2755 retval = 0.1 + 5.0 * m + m * my_drand48();
2756 } else {
2757 retval = old * 1.4142136; /* Exponential backoff */
2758 }
2759 while (retval > 3.0)
2760 retval /= 1.31415926;
2761 /* DBGOUT(FN; NDBG(retval,d)); */
2762 return retval;
2763 }
2764
2765
2766 #endif
2767
propose_noop(synode_no find,pax_machine * p)2768 static void propose_noop(synode_no find, pax_machine *p)
2769 {
2770 /* Prepare to send a noop */
2771 site_def const *site = find_site_def(find);
2772 pax_msg *clone = NULL;
2773 assert(! too_far(find));
2774 replace_pax_msg(&p->proposer.msg, pax_msg_new(find, site));
2775 assert(p->proposer.msg);
2776 create_noop(p->proposer.msg);
2777 /* DBGOUT(FN; SYCEXP(find);); */
2778
2779 clone = clone_pax_msg(p->proposer.msg);
2780 if (clone != NULL) {
2781 push_msg_3p(site, p, clone, find, no_op);
2782 } else {
2783 G_DEBUG("Unable to propose NoOp due to an OOM error.");
2784 }
2785 }
2786
2787
send_read(synode_no find)2788 static void send_read(synode_no find)
2789 {
2790 /* Prepare to send a read_op */
2791 site_def const *site = find_site_def(find);
2792 MAY_DBG(FN; NDBG(get_maxnodes(site),u); NDBG(get_nodeno(site),u););
2793 if (site && find.node != get_nodeno(site)) {
2794 pax_msg * pm = pax_msg_new(find, site);
2795 ref_msg(pm);
2796 create_read(site, pm);
2797 MAY_DBG(FN; SYCEXP(find););
2798
2799 MAY_DBG(FN; NDBG(get_maxnodes(site),u); NDBG(get_nodeno(site),u); PTREXP(pm));
2800 /* send_server_msg(site, find.node, pm); */
2801 #if 0
2802 send_to_others(site, pm, "send_read");
2803 #else
2804 if(get_nodeno(site) == VOID_NODE_NO)
2805 send_to_others(site, pm, "send_read");
2806 else
2807 send_to_someone(site, pm, "send_read");
2808 #endif
2809 unref_msg(&pm);
2810 }
2811 }
2812
2813
2814 /* }}} */
2815
2816 /* {{{ Find missing values */
2817
ok_to_propose(pax_machine * p)2818 static int ok_to_propose(pax_machine *p)
2819 {
2820 #if 0
2821 site_def const *s = find_site_def(p->synode.group_id);
2822 int retval = (p->synode.node == get_nodeno(s) || task_now() -p->last_modified > DETECTOR_LIVE_TIMEOUT || may_be_dead(s->detected, p->synode.node, task_now()))
2823 && !recently_active(p) && !finished(p) && !is_busy_machine(p);
2824 #else
2825 int retval = !recently_active(p) && !finished(p) && !is_busy_machine(p);
2826 #endif
2827 MAY_DBG(FN; NDBG(p->synode.node, u); NDBG(recently_active(p),d); NDBG(finished(p),d); NDBG(is_busy_machine(p),d); NDBG(retval, d));
2828 return retval;
2829 }
2830
2831
read_missing_values(int n)2832 static void read_missing_values(int n)
2833 {
2834 synode_no find = executed_msg;
2835 synode_no end = max_synode;
2836 int i = 0;
2837
2838 MAY_DBG(FN; SYCEXP(find); SYCEXP(end));
2839 if (synode_gt(executed_msg, max_synode) ||
2840 synode_eq(executed_msg, null_synode))
2841 return;
2842
2843 while (!synode_gt(find, end) && i < n && ! too_far(find)) {
2844 pax_machine * p = get_cache(find);
2845 ADD_EVENTS(
2846 add_synode_event(find);
2847 add_synode_event(end);
2848 add_event(string_arg("active "));
2849 add_event(int_arg(recently_active(p)));
2850 add_event(string_arg("finished "));
2851 add_event(int_arg(finished(p)));
2852 add_event(string_arg("busy "));
2853 add_event(int_arg(is_busy_machine(p)));
2854 );
2855 MAY_DBG(FN; SYCEXP(find); SYCEXP(end); NDBG(recently_active(p), d); NDBG(finished(p), d); NDBG(is_busy_machine(p), d));
2856
2857
2858 if (!recently_active(p) && !finished(p) && !is_busy_machine(p)) {
2859 send_read(find);
2860 }
2861 find = incr_synode(find);
2862 i++;
2863 }
2864 }
2865
2866
propose_missing_values(int n)2867 static void propose_missing_values(int n)
2868 {
2869 synode_no find = executed_msg;
2870 synode_no end = max_synode;
2871 int i = 0;
2872
2873 DBGOHK(FN; NDBG(get_maxnodes(get_site_def()), u); SYCEXP(find); SYCEXP(end));
2874 if (
2875 synode_gt(executed_msg, max_synode) ||
2876 synode_eq(executed_msg, null_synode))
2877 return;
2878
2879 MAY_DBG(FN; SYCEXP(find); SYCEXP(end));
2880 i = 0;
2881 while (!synode_gt(find, end) && i < n && ! too_far(find)) {
2882 pax_machine * p = get_cache(find);
2883 DBGOHK(FN; NDBG(ok_to_propose(p),d); TIMECEXP(task_now()); TIMECEXP(p->last_modified); SYCEXP(find))
2884 ;
2885 if(get_nodeno(find_site_def(find)) == VOID_NODE_NO)
2886 break;
2887 if (ok_to_propose(p)) {
2888 if (task_now() - BUILD_TIMEOUT > p->last_modified){
2889 propose_noop(find, p);
2890 }
2891 }
2892 find = incr_synode(find);
2893 i++;
2894 }
2895 }
2896
2897
2898 /* Propose a noop for the range find..end */
request_values(synode_no find,synode_no end)2899 void request_values(synode_no find, synode_no end)
2900 {
2901 DBGOUT(FN; SYCEXP(find); SYCEXP(find); SYCEXP(end); );
2902 while (!synode_gt(find, end) && ! too_far(find)) {
2903 pax_machine * p = get_cache(find);
2904 site_def const *site = find_site_def(find);
2905 if(get_nodeno(site) == VOID_NODE_NO)
2906 break;
2907 if (!finished(p) && !is_busy_machine(p)) {
2908 /* Prepare to send a noop */
2909 replace_pax_msg(&p->proposer.msg, pax_msg_new(find, site));
2910 assert(p->proposer.msg);
2911 create_noop(p->proposer.msg);
2912
2913 DBGOUT(FN; STRLIT("propose"); SYCEXP(find); );
2914 push_msg_3p(site, p, pax_msg_new(find, site), find, no_op);
2915 }
2916 find = incr_synode(find);
2917 }
2918 }
2919
2920
2921 /* }}} */
2922
2923 /* {{{ Message handlers */
2924 #if 0
2925 void reply_msg(site_def const * site, pax_msg *m)
2926 {
2927 MAY_DBG(FN; );
2928 if (get_server(s, m->from)) {
2929 send_server_msg(s, m->from, m);
2930 }
2931 }
2932
2933 #else
2934 #define reply_msg(m) \
2935 { \
2936 if(is_local_node((m)->from, site)){ \
2937 dispatch_op(site, m, NULL); \
2938 }else{ \
2939 if(node_no_exists((m)->from, site) && (m)->group_id == get_group_id(site) && get_server(site, (m)->from)){ \
2940 send_server_msg(site, (m)->from, m); \
2941 }else{ \
2942 link_into(&(msg_link_new((m), (m)->from)->l), reply_queue); \
2943 } \
2944 } \
2945 }
2946 #endif
2947
2948
2949 #define CREATE_REPLY(x) pax_msg *reply = NULL; CLONE_PAX_MSG(reply, x)
2950 #define SEND_REPLY reply_msg(reply); replace_pax_msg(&reply, NULL)
2951
safe_app_data_copy(pax_msg ** target,app_data_ptr source)2952 bool_t safe_app_data_copy(pax_msg **target, app_data_ptr source) {
2953 copy_app_data(&(*target)->a, source);
2954 if ((*target)->a == NULL && source != NULL) {
2955 oom_abort = 1;
2956 replace_pax_msg(target, NULL);
2957 return FALSE;
2958 }
2959 return TRUE;
2960 }
2961
teach_ignorant_node(site_def const * site,pax_machine * p,pax_msg * pm,synode_no synode,linkage * reply_queue)2962 static void teach_ignorant_node(site_def const * site, pax_machine *p, pax_msg *pm, synode_no synode, linkage *reply_queue)
2963 {
2964 CREATE_REPLY(pm);
2965 DBGOUT(FN; SYCEXP(synode));
2966 reply->synode = synode;
2967 reply->proposal = p->learner.msg->proposal;
2968 reply->msg_type = p->learner.msg->msg_type;
2969 safe_app_data_copy(&reply, p->learner.msg->a);
2970 if (reply != NULL) {
2971 set_learn_type(reply);
2972 /* set_unique_id(reply, p->learner.msg->unique_id); */
2973 SEND_REPLY;
2974 }
2975 }
2976
2977
2978 /* Handle incoming read */
handle_read(site_def const * site,pax_machine * p,linkage * reply_queue,pax_msg * pm)2979 static void handle_read(site_def const * site, pax_machine *p, linkage *reply_queue, pax_msg *pm)
2980 {
2981 DBGOUT(FN;
2982 BALCEXP(pm->proposal);
2983 BALCEXP(p->acceptor.promise);
2984 if (p->acceptor.msg)
2985 BALCEXP(p->acceptor.msg->proposal);
2986 STRLIT("type "); STRLIT(pax_msg_type_to_str(pm->msg_type))) ;
2987
2988 if (finished(p)) { /* We have learned a value */
2989 teach_ignorant_node(site, p, pm, pm->synode, reply_queue);
2990 }
2991 }
2992
2993
2994 #ifdef USE_EXIT_TYPE
miss_prepare(site_def const * site,pax_msg * pm,linkage * reply_queue)2995 static void miss_prepare(site_def const * site, pax_msg *pm, linkage *reply_queue)
2996 {
2997 CREATE_REPLY(pm);
2998 DBGOUT(FN; SYCEXP(pm->synode));
2999 reply->msg_type = normal;
3000 reply->a = new_app_data();
3001 reply->a->body.c_t = exit_type;
3002 reply->op = ack_prepare_op;
3003 SEND_REPLY;
3004 }
3005
3006
miss_accept(site_def const * site,pax_msg * pm,linkage * reply_queue)3007 static void miss_accept(site_def const * site, pax_msg *pm, linkage *reply_queue)
3008 {
3009 CREATE_REPLY(pm);
3010 DBGOUT(FN; SYCEXP(pm->synode));
3011 ref_msg(reply);
3012 reply->msg_type = normal;
3013 reply->op = ack_accept_op;
3014 if (servers[pm->from]) {
3015 send_server_msg(site, pm->from, pm);
3016 }
3017 SEND_REPLY;
3018 }
3019
3020
3021 #endif
3022
handle_simple_prepare(site_def const * site,pax_machine * p,pax_msg * pm,synode_no synode,linkage * reply_queue)3023 static void handle_simple_prepare(site_def const * site, pax_machine *p, pax_msg *pm, synode_no synode, linkage *reply_queue)
3024 {
3025 if (finished(p)) { /* We have learned a value */
3026 MAY_DBG(FN; SYCEXP(synode); BALCEXP(pm->proposal); NDBG(finished(p), d));
3027 teach_ignorant_node(site, p, pm, synode, reply_queue);
3028 } else {
3029 int greater = gt_ballot(pm->proposal, p->acceptor.promise); /* Paxos acceptor phase 1 decision */
3030 MAY_DBG(FN; SYCEXP(synode); BALCEXP(pm->proposal); NDBG(greater, d));
3031 if (greater || noop_match(p, pm) ) {
3032 CREATE_REPLY(pm);
3033 reply->synode = synode;
3034 if (greater)
3035 p->acceptor.promise = pm->proposal; /* promise to not accept any less */
3036 if (accepted(p)) { /* We have accepted a value */
3037 reply->proposal = p->acceptor.msg->proposal;
3038 reply->msg_type = p->acceptor.msg->msg_type;
3039 MAY_DBG(FN; STRLIT(" already accepted value "); SYCEXP(synode));
3040 reply->op = ack_prepare_op;
3041 safe_app_data_copy(&reply, p->acceptor.msg->a);
3042 if (reply == NULL) return; // Failed to allocate memory for the copy.
3043 } else {
3044 MAY_DBG(FN; STRLIT(" no value synode "); SYCEXP(synode));
3045 reply->op = ack_prepare_empty_op;
3046 }
3047 SEND_REPLY;
3048 }
3049 }
3050 }
3051
3052
3053 /* Handle incoming prepare */
handle_prepare(site_def const * site,pax_machine * p,linkage * reply_queue,pax_msg * pm)3054 static void handle_prepare(site_def const * site, pax_machine *p, linkage *reply_queue, pax_msg *pm)
3055 {
3056 ADD_EVENTS(
3057 add_synode_event(p->synode);
3058 add_event(string_arg("pm->from"));
3059 add_event(int_arg(pm->from));
3060 add_event(string_arg(pax_op_to_str(pm->op)));
3061 );
3062 #if 0
3063 DBGOUT(FN;
3064 NDBG(pm->from, d); NDBG(pm->to, d);
3065 SYCEXP(pm->synode);
3066 BALCEXP(pm->proposal); BALCEXP(p->acceptor.promise));
3067 #endif
3068 MAY_DBG(FN; BALCEXP(pm->proposal);
3069 BALCEXP(p->acceptor.promise);
3070 if (p->acceptor.msg)
3071 BALCEXP(p->acceptor.msg->proposal);
3072 STRLIT("type "); STRLIT(pax_msg_type_to_str(pm->msg_type))) ;
3073
3074 handle_simple_prepare(site, p, pm, pm->synode, reply_queue);
3075 }
3076
3077
check_propose(site_def const * site,pax_machine * p)3078 static void check_propose(site_def const * site, pax_machine *p)
3079 {
3080 MAY_DBG(FN; SYCEXP(p->synode);
3081 COPY_AND_FREE_GOUT(dbg_machine_nodeset(p, get_maxnodes(site)));
3082 );
3083 PAX_MSG_SANITY_CHECK(p->proposer.msg);
3084 if (prep_majority(site, p)) {
3085 p->proposer.msg->proposal = p->proposer.bal;
3086 BIT_ZERO(p->proposer.prop_nodeset);
3087 p->proposer.msg->synode = p->synode;
3088 propose_msg(p->proposer.msg);
3089 p->proposer.sent_prop = p->proposer.bal;
3090 }
3091 }
3092
3093
check_learn(site_def const * site,pax_machine * p)3094 static void check_learn(site_def const * site, pax_machine *p)
3095 {
3096 MAY_DBG(FN; SYCEXP(p->synode);
3097 COPY_AND_FREE_GOUT(dbg_machine_nodeset(p, get_maxnodes(site)));
3098 );
3099 PAX_MSG_SANITY_CHECK(p->proposer.msg);
3100 if (get_nodeno(site) != VOID_NODE_NO && prop_majority(site, p)) {
3101 p->proposer.msg->synode = p->synode;
3102 if (p->proposer.msg->receivers)
3103 free_bit_set(p->proposer.msg->receivers);
3104 p->proposer.msg->receivers = clone_bit_set(p->proposer.prep_nodeset);
3105 BIT_SET(get_nodeno(site), p->proposer.msg->receivers);
3106 if(no_duplicate_payload)
3107 tiny_learn_msg(site, p->proposer.msg);
3108 else
3109 learn_msg(site, p->proposer.msg);
3110 p->proposer.sent_learn = p->proposer.bal;
3111 }
3112 }
3113
3114
do_learn(site_def const * site MY_ATTRIBUTE ((unused)),pax_machine * p,pax_msg * m)3115 static void do_learn(site_def const * site MY_ATTRIBUTE((unused)), pax_machine *p, pax_msg *m)
3116 {
3117 ADD_EVENTS(
3118 add_synode_event(p->synode);
3119 add_event(string_arg("m->from"));
3120 add_event(int_arg(m->from));
3121 add_event(string_arg(pax_op_to_str(m->op)));
3122 );
3123 /* FN; SYCEXP(p->synode); SYCEXP(m->synode); STRLIT(NEWLINE); */
3124 MAY_DBG(FN; SYCEXP(p->synode); SYCEXP(m->synode);
3125 dbg_bitset(m->receivers, get_maxnodes(site));
3126 );
3127 if (m->a)
3128 m->a->chosen = TRUE;
3129 replace_pax_msg(&p->acceptor.msg, m);
3130 replace_pax_msg(&p->learner.msg, m);
3131 /*
3132 Track memory used by client data in the cache.
3133 If we do not care about instances that are being decided,
3134 it is only necessary to compute the added memory when we
3135 record the outcome of a consensus round.
3136 */
3137 add_cache_size(pax_machine_size(p));
3138 /* Shrink the cache size if necessary */
3139 shrink_cache();
3140 }
3141
3142
handle_simple_ack_prepare(site_def const * site MY_ATTRIBUTE ((unused)),pax_machine * p,pax_msg * m)3143 static void handle_simple_ack_prepare(site_def const * site MY_ATTRIBUTE((unused)), pax_machine *p, pax_msg *m)
3144 {
3145 if(get_nodeno(site) != VOID_NODE_NO)
3146 BIT_SET(m->from, p->proposer.prep_nodeset);
3147 }
3148
3149
3150 /* Other node has already accepted a value */
handle_ack_prepare(site_def const * site,pax_machine * p,pax_msg * m)3151 static void handle_ack_prepare(site_def const * site, pax_machine *p, pax_msg *m)
3152 {
3153 ADD_EVENTS(
3154 add_synode_event(p->synode);
3155 add_event(string_arg("m->from"));
3156 add_event(int_arg(m->from));
3157 add_event(string_arg(pax_op_to_str(m->op)));
3158 );
3159 #if 0
3160 DBGOUT(FN;
3161 NDBG(pm->from, d); NDBG(pm->to, d);
3162 SYCEXP(pm->synode);
3163 BALCEXP(pm->proposal); BALCEXP(p->acceptor.promise));
3164 #endif
3165 assert(m);
3166 MAY_DBG(FN;
3167 if (p->proposer.msg)
3168 BALCEXP(p->proposer.msg->proposal);
3169 BALCEXP(p->proposer.bal);
3170 BALCEXP(m->reply_to);
3171 BALCEXP(p->proposer.sent_prop);
3172 SYCEXP(m->synode)) ;
3173 if (m->from != VOID_NODE_NO && eq_ballot(p->proposer.bal, m->reply_to)) { /* answer to my prepare */
3174 handle_simple_ack_prepare(site, p, m);
3175 if (gt_ballot(m->proposal, p->proposer.msg->proposal)) { /* greater */
3176 replace_pax_msg(&p->proposer.msg, m);
3177 assert(p->proposer.msg);
3178 }
3179 if (gt_ballot(m->reply_to, p->proposer.sent_prop))
3180 check_propose(site, p);
3181 }
3182 }
3183
3184
3185 /* Other node has not already accepted a value */
handle_ack_prepare_empty(site_def const * site,pax_machine * p,pax_msg * m)3186 static void handle_ack_prepare_empty(site_def const * site, pax_machine *p, pax_msg *m)
3187 {
3188 ADD_EVENTS(
3189 add_synode_event(p->synode);
3190 add_event(string_arg("m->from"));
3191 add_event(int_arg(m->from));
3192 add_event(string_arg(pax_op_to_str(m->op)));
3193 );
3194 #if 0
3195 DBGOUT(FN;
3196 NDBG(pm->from, d); NDBG(pm->to, d);
3197 SYCEXP(pm->synode);
3198 BALCEXP(pm->proposal); BALCEXP(p->acceptor.promise));
3199 #endif
3200 MAY_DBG(FN;
3201 if (p->proposer.msg)
3202 BALCEXP(p->proposer.msg->proposal);
3203 BALCEXP(p->proposer.bal);
3204 BALCEXP(m->reply_to);
3205 BALCEXP(p->proposer.sent_prop);
3206 SYCEXP(m->synode)) ;
3207 if (m->from != VOID_NODE_NO && eq_ballot(p->proposer.bal, m->reply_to)) { /* answer to my prepare */
3208 handle_simple_ack_prepare(site, p, m);
3209 if (gt_ballot(m->reply_to, p->proposer.sent_prop))
3210 check_propose(site, p);
3211 }
3212 }
3213
3214
3215 /* #define AUTO_MSG(p,synode) {if(!(p)){replace_pax_msg(&(p), pax_msg_new(synode, site));} */
3216
handle_simple_accept(site_def const * site,pax_machine * p,pax_msg * m,synode_no synode,linkage * reply_queue)3217 static void handle_simple_accept(site_def const * site, pax_machine *p, pax_msg *m, synode_no synode, linkage *reply_queue)
3218 {
3219 if (finished(p)) { /* We have learned a value */
3220 teach_ignorant_node(site, p, m, synode, reply_queue);
3221 } else if (!gt_ballot(p->acceptor.promise, m->proposal) || /* Paxos acceptor phase 2 decision */
3222 noop_match(p, m) ) {
3223 MAY_DBG(FN; SYCEXP(m->synode); STRLIT("accept "); BALCEXP(m->proposal));
3224 replace_pax_msg(&p->acceptor.msg, m);
3225 {
3226 CREATE_REPLY(m);
3227 reply->op = ack_accept_op;
3228 reply->synode = synode;
3229 SEND_REPLY;
3230 }
3231 }
3232 }
3233
3234
3235 /* Accecpt value if promise is not greater */
handle_accept(site_def const * site,pax_machine * p,linkage * reply_queue,pax_msg * m)3236 static void handle_accept(site_def const * site, pax_machine *p, linkage *reply_queue, pax_msg *m)
3237 {
3238 MAY_DBG(FN;
3239 BALCEXP(p->acceptor.promise);
3240 BALCEXP(m->proposal);
3241 STREXP(pax_msg_type_to_str(m->msg_type)));
3242 PAX_MSG_SANITY_CHECK(m);
3243 ADD_EVENTS(
3244 add_synode_event(p->synode);
3245 add_event(string_arg("m->from"));
3246 add_event(int_arg(m->from));
3247 add_event(string_arg(pax_op_to_str(m->op)));
3248 );
3249
3250 handle_simple_accept(site, p, m, m->synode, reply_queue);
3251 }
3252
3253
3254 /* Handle answer to accept */
handle_ack_accept(site_def const * site,pax_machine * p,pax_msg * m)3255 static void handle_ack_accept(site_def const * site, pax_machine *p, pax_msg *m)
3256 {
3257 ADD_EVENTS(
3258 add_synode_event(p->synode);
3259 add_event(string_arg("m->from"));
3260 add_event(int_arg(m->from));
3261 add_event(string_arg(pax_op_to_str(m->op)));
3262 );
3263 MAY_DBG(FN; SYCEXP(m->synode); BALCEXP(p->proposer.bal); BALCEXP(p->proposer.sent_learn); BALCEXP(m->proposal); BALCEXP(m->reply_to);
3264 );
3265 MAY_DBG(FN; SYCEXP(p->synode);
3266 if (p->acceptor.msg)
3267 BALCEXP(p->acceptor.msg->proposal);
3268 BALCEXP(p->proposer.bal);
3269 BALCEXP(m->reply_to);
3270 ) ;
3271 if (get_nodeno(site) != VOID_NODE_NO && m->from != VOID_NODE_NO &&
3272 eq_ballot(p->proposer.bal, m->reply_to)) { /* answer to my accept */
3273 BIT_SET(m->from, p->proposer.prop_nodeset);
3274 if (gt_ballot(m->proposal, p->proposer.sent_learn))
3275 check_learn(site, p);
3276 }
3277 }
3278
3279 /* Configure all messages in interval start, end to be forced */
force_interval(synode_no start,synode_no end)3280 static void force_interval(synode_no start, synode_no end)
3281 {
3282 while (synode_lt(start, end)) {
3283 pax_machine * p = get_cache(start);
3284 if(get_nodeno(find_site_def(start)) == VOID_NODE_NO)
3285 break;
3286 /* if(! finished(p)) */
3287 p->force_delivery = 1;
3288 /* Old nodesets are null and void */
3289 BIT_ZERO(p->proposer.prep_nodeset);
3290 BIT_ZERO(p->proposer.prep_nodeset);
3291 start = incr_synode(start);
3292 }
3293 }
3294
start_force_config(site_def * s)3295 static void start_force_config(site_def *s)
3296 {
3297 synode_no end = s->boot_key;
3298
3299 synode_set_to_event_horizon(&end);
3300
3301 DBGOUT(FN; SYCEXP(executed_msg); SYCEXP(end));
3302 if (synode_gt(end, max_synode))
3303 set_max_synode(end);
3304
3305 free_site_def(forced_config);
3306 forced_config = s;
3307 force_interval(executed_msg, max_synode); /* Force everything in the pipeline */
3308 }
3309
3310
3311 /* Learn this value */
handle_learn(site_def const * site,pax_machine * p,pax_msg * m)3312 static void handle_learn(site_def const * site, pax_machine *p, pax_msg *m)
3313 {
3314 MAY_DBG(FN; STRLIT("proposer nodeset ");
3315 dbg_bitset(p->proposer.prop_nodeset, get_maxnodes(site));
3316 );
3317 MAY_DBG(FN; STRLIT("receivers ");
3318 dbg_bitset(m->receivers, get_maxnodes(site));
3319 );
3320 MAY_DBG(FN; NDBG(task_now(), f); SYCEXP(p->synode);
3321 COPY_AND_FREE_GOUT(dbg_app_data(m->a));
3322 );
3323
3324 PAX_MSG_SANITY_CHECK(m);
3325
3326 if (!finished(p)) { /* Avoid re-learn */
3327 do_learn(site, p, m);
3328 /* Check for special messages */
3329 if(m->a && m->a->body.c_t == unified_boot_type){
3330 DBGOUT(FN; STRLIT("Got unified_boot "); SYCEXP(p->synode); SYCEXP(m->synode); );
3331 XCOM_FSM(xa_net_boot, void_arg(m->a));
3332 }
3333 /* See if someone is forcing a new config */
3334 if(m->force_delivery && m->a){
3335 DBGOUT(FN; STRLIT("Got forced config "); SYCEXP(p->synode); SYCEXP(m->synode); );
3336 /* Configure all messages from executed_msg until start of new config
3337 as forced messages so they will eventually be finished */
3338 /* Immediately install this new config */
3339 switch (m->a->body.c_t) {
3340 case add_node_type:
3341 /* purecov: begin deadcode */
3342 start_force_config(clone_site_def(handle_add_node(m->a)));
3343 break;
3344 /* purecov: end */
3345 case remove_node_type:
3346 /* purecov: begin deadcode */
3347 start_force_config(clone_site_def(handle_remove_node(m->a)));
3348 break;
3349 /* purecov: end */
3350 case force_config_type:
3351 start_force_config(clone_site_def(install_node_group(m->a)));
3352 break;
3353 default:
3354 break;
3355 }
3356 force_interval(executed_msg, getstart(m->a));
3357 }
3358 }
3359
3360 task_wakeup(&p->rv);
3361 }
3362
3363
3364 /* Skip this value */
handle_skip(site_def const * site,pax_machine * p,pax_msg * m)3365 static void handle_skip(site_def const * site, pax_machine *p, pax_msg *m)
3366 {
3367 /* MAY_DBG(FN;); */
3368 /* MAY_DBG(FN; NDBG(task_now(),f); SYCEXP(p->msg->synode)); */
3369 if (!finished(p)) {
3370 skip_value(m);
3371 do_learn(site, p, m);
3372 }
3373 /* MAY_DBG(FN; STRLIT("taskwakeup "); SYCEXP(p->msg->synode)); */
3374 task_wakeup(&p->rv);
3375 }
3376
3377
handle_client_msg(pax_msg * p)3378 static void handle_client_msg(pax_msg *p)
3379 {
3380 if (!p || p->a == NULL)
3381 /* discard invalid message */
3382 return;
3383
3384 {
3385 msg_link * ml = msg_link_new(p, VOID_NODE_NO);
3386
3387 /* Put it in the proposer queue */
3388 ADD_T_EV(task_now(), __FILE__, __LINE__, "handle_client_msg");
3389 channel_put(&prop_input_queue, &ml->l);
3390 }
3391 }
3392
3393 /* Handle incoming alive message */
3394 static double sent_alive = 0.0;
handle_alive(site_def const * site,linkage * reply_queue,pax_msg * pm)3395 static inline void handle_alive(site_def const * site, linkage *reply_queue, pax_msg *pm)
3396 {
3397 int not_to_oneself = (pm->from != get_nodeno(site) && pm->from != pm->to);
3398 DBGOUT(FN; SYCEXP(pm->synode); NDBG(pm->from,u); NDBG(pm->to,u); );
3399
3400 /*
3401 This code will check if the ping is intended to us.
3402 If the encoded node does not exist in the current configuration,
3403 we avoid sending need_boot_op, since it must be from a different
3404 reincarnation of this node.
3405 */
3406 if(site && pm->a && pm->a->body.c_t == xcom_boot_type)
3407 {
3408 DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(&pm->a->body.app_u_u.nodes)););
3409 not_to_oneself &=
3410 node_exists_with_uid(&pm->a->body.app_u_u.nodes.node_list_val[0], &get_site_def()->nodes);
3411 }
3412
3413
3414 if (!client_boot_done && /* Already done? */
3415 not_to_oneself && /* Not to oneself */
3416 !is_dead_site(pm->group_id)) { /* Avoid dealing with zombies */
3417 double t = task_now();
3418 if (t - sent_alive > 1.0) {
3419 CREATE_REPLY(pm);
3420 reply->op = need_boot_op;
3421 SEND_REPLY;
3422 sent_alive = t;
3423 DBGOUT(FN; STRLIT("sent need_boot_op"); );
3424 }
3425 }
3426 }
3427
3428
update_max_synode(pax_msg * p)3429 static void update_max_synode(pax_msg *p)
3430 {
3431 #if 0
3432 /* if (group_id == 0 || synode_eq(max_synode, null_synode) || */
3433 /* synode_gt(p->max_synode, max_synode)) { */
3434 /* set_max_synode(p->max_synode); */
3435 /* } */
3436 if (group_id == 0 || synode_eq(max_synode, null_synode) ||
3437 (p->msg_type == normal &&
3438 (max_synode.group_id == 0 || p->synode.group_id == 0 ||
3439 max_synode.group_id == p->synode.group_id) &&
3440 synode_gt(p->synode, max_synode))) {
3441 set_max_synode(p->synode);
3442 }
3443 #else
3444 if (is_dead_site(p->group_id))
3445 return;
3446 if (get_group_id(get_site_def()) == 0 || max_synode.group_id == 0) {
3447 set_max_synode(p->synode);
3448 } else if (max_synode.group_id == p->synode.group_id) {
3449 if (synode_gt(p->synode, max_synode)) {
3450 set_max_synode(p->synode);
3451 }
3452 if (synode_gt(p->max_synode, max_synode)) {
3453 set_max_synode(p->max_synode);
3454 }
3455 }
3456 #endif
3457 }
3458
3459
3460 /* Add app_data to message cache */
3461 /* purecov: begin deadcode */
add_to_cache(app_data_ptr a,synode_no synode)3462 void add_to_cache(app_data_ptr a, synode_no synode)
3463 {
3464 pax_machine * pm = get_cache(synode);
3465 pax_msg * msg = pax_msg_new_0(synode);
3466 ref_msg(msg);
3467 assert(pm);
3468 safe_app_data_copy(&msg, a);
3469 if (msg != NULL) {
3470 set_learn_type(msg);
3471 /* msg->unique_id = a->unique_id; */
3472 do_learn(0, pm, msg);
3473 unref_msg(&msg);
3474 }
3475 }
3476 /* purecov: end */
3477
3478 /* }}} */
3479
3480 /* {{{ Message dispatch */
3481 #define BAL_FMT "ballot {cnt %d node %d}"
3482 #define BAL_MEM(x) (x).cnt, (x).node
3483
3484 static int clicnt = 0;
3485
is_reincarnation_adding(app_data_ptr a)3486 static u_int is_reincarnation_adding(app_data_ptr a)
3487 {
3488 /* Get information on the current site definition */
3489 const site_def* new_site_def= get_site_def();
3490 const site_def* valid_site_def= find_site_def(executed_msg);
3491
3492 /* Get information on the nodes to be added */
3493 u_int nodes_len = a->body.app_u_u.nodes.node_list_len;
3494 node_address* nodes_to_change= a->body.app_u_u.nodes.node_list_val;
3495
3496 u_int i = 0;
3497 for(; i < nodes_len; i++)
3498 {
3499 if (node_exists(&nodes_to_change[i], &new_site_def->nodes) ||
3500 node_exists(&nodes_to_change[i], &valid_site_def->nodes))
3501 {
3502 /*
3503 We are simply ignoring the attempt to add a node to the
3504 group when there is an old incarnation of it, meaning
3505 that the node has crashed and restarted so fastly that
3506 nobody has noticed that it has gone.
3507
3508 In XCOM, the group is not automatically reconfigured
3509 and it is possible to start reusing a node that has
3510 crashed and restarted without reconfiguring the group
3511 by adding the node back to it.
3512
3513 However, this operation may be unsafe because XCOM
3514 does not implement a crash-recovery model and nodes
3515 suffer from amnesia after restarting the service. In
3516 other words this may lead to inconsistency issues in
3517 the paxos protocol.
3518
3519 Unfortunately, preventing that a node is added back
3520 to the system where there is an old incarnation will
3521 not fix this problem since other changes are required.
3522 */
3523 G_MESSAGE("Old incarnation found while trying to add node %s %.*s.",
3524 nodes_to_change[i].address,
3525 nodes_to_change[i].uuid.data.data_len,
3526 nodes_to_change[i].uuid.data.data_val
3527 );
3528 return 1;
3529 }
3530 }
3531
3532 return 0;
3533 }
3534
is_reincarnation_removing(app_data_ptr a)3535 static u_int is_reincarnation_removing(app_data_ptr a)
3536 {
3537 /* Get information on the current site definition */
3538 const site_def* new_site_def= get_site_def();
3539
3540 /* Get information on the nodes to be added */
3541 u_int nodes_len = a->body.app_u_u.nodes.node_list_len;
3542 node_address* nodes_to_change= a->body.app_u_u.nodes.node_list_val;
3543
3544 u_int i = 0;
3545 for(; i < nodes_len; i++)
3546 {
3547 if (!node_exists_with_uid(&nodes_to_change[i], &new_site_def->nodes))
3548 {
3549 /*
3550 We cannot allow an upper-layer to remove a new incarnation
3551 of a node, when it tries to remove an old one.
3552 */
3553 G_MESSAGE("Old incarnation found while trying to "
3554 "remove node %s %.*s.",
3555 nodes_to_change[i].address,
3556 nodes_to_change[i].uuid.data.data_len,
3557 nodes_to_change[i].uuid.data.data_val
3558 );
3559
3560 return 1;
3561 }
3562 }
3563
3564 return 0;
3565 }
3566
3567 /**
3568 * Logs the fact that an add/remove node request is aimed at another group.
3569 *
3570 * @param a a pointer to the app_data of the configuration command
3571 * @param message_fmt a formatted message to log, containing a single %s that will be replaced by the node's address
3572 */
log_cfgchange_wrong_group(app_data_ptr a,const char * const message_fmt)3573 static void log_cfgchange_wrong_group(app_data_ptr a, const char *const message_fmt)
3574 {
3575 u_int const nr_nodes = a->body.app_u_u.nodes.node_list_len;
3576 u_int i;
3577 for (i = 0; i < nr_nodes; i++)
3578 {
3579 char const *const address = a->body.app_u_u.nodes.node_list_val[i].address;
3580 G_WARNING(message_fmt, address);
3581 }
3582 }
3583
3584 /**
3585 * Validates if a configuration command can be executed.
3586 * Checks whether the configuration command is aimed at the correct group.
3587 * Checks whether the configuration command pertains to a node reincarnation.
3588 *
3589 * @param p a pointer to the pax_msg of the configuration command
3590 * @retval REQUEST_OK if the reconfiguration command can be executed
3591 * @retval REQUEST_RETRY if XCom is still booting
3592 * @retval REQUEST_FAIL if the configuration command cannot be executed
3593 */
can_execute_cfgchange(pax_msg * p)3594 static client_reply_code can_execute_cfgchange(pax_msg *p)
3595 {
3596 app_data_ptr a = p->a;
3597
3598 if (executed_msg.msgno <= 2)
3599 return REQUEST_RETRY;
3600
3601 if (a && a->group_id != 0 && a->group_id != executed_msg.group_id)
3602 {
3603 switch (a->body.c_t) {
3604 case add_node_type:
3605 log_cfgchange_wrong_group(a, "The request to add %s to the group has been rejected because it is aimed at another group");
3606 break;
3607 case remove_node_type:
3608 log_cfgchange_wrong_group(a, "The request to remove %s from the group has been rejected because it is aimed at another group");
3609 break;
3610 case force_config_type:
3611 G_WARNING("The request to force the group membership has been rejected because it is aimed at another group");
3612 break;
3613 default:
3614 assert(0 && "A cargo_type different from {add_node_type, remove_node_type, force_config_type} should not have hit this code path");
3615 }
3616 return REQUEST_FAIL;
3617 }
3618
3619 if (a && a->body.c_t == add_node_type && is_reincarnation_adding(a))
3620 return REQUEST_FAIL;
3621
3622 if (a && a->body.c_t == remove_node_type && is_reincarnation_removing(a))
3623 return REQUEST_FAIL;
3624
3625 return REQUEST_OK;
3626 }
3627
activate_sweeper()3628 static void activate_sweeper()
3629 {
3630 if (sweeper) {
3631 ADD_EVENTS(
3632 add_event(string_arg("sweeper activated max_synode"));
3633 add_synode_event(max_synode);
3634 );
3635 task_activate(sweeper);
3636 }
3637 }
3638
dispatch_op(site_def const * site,pax_msg * p,linkage * reply_queue)3639 pax_msg *dispatch_op(site_def const *site, pax_msg *p, linkage *reply_queue)
3640 {
3641 pax_machine * pm = NULL;
3642 site_def * dsite = find_site_def_rw(p->synode);
3643 int in_front = too_far(p->synode);
3644
3645 if(p->force_delivery){
3646 /* Ensure that forced message can be processed */
3647 in_front = 0;
3648 }
3649
3650 if (dsite && p->op != client_msg){
3651 note_detected(dsite, p->from);
3652 update_delivered(dsite, p->from, p->delivered_msg);
3653 }
3654
3655 MAY_DBG(FN; STRLIT("incoming message ");
3656 COPY_AND_FREE_GOUT(dbg_pax_msg(p));
3657 );
3658 ADD_EVENTS(
3659 add_synode_event(p->synode);
3660 add_event(string_arg("p->from"));
3661 add_event(int_arg(p->from));
3662 add_event(string_arg(pax_op_to_str(p->op)));
3663 );
3664 switch (p->op) {
3665 case client_msg:
3666 clicnt++;
3667 if (p->a && (p->a->body.c_t == enable_arbitrator)) {
3668 CREATE_REPLY(p);
3669 DBGOUT(FN; STRLIT("Got enable_arbitrator from client"); SYCEXP(p->synode); );
3670 ARBITRATOR_HACK = 1;
3671 reply->op = xcom_client_reply;
3672 reply->cli_err = REQUEST_OK;
3673 SEND_REPLY;
3674 break;
3675 }
3676 if (p->a && (p->a->body.c_t == disable_arbitrator)) {
3677 CREATE_REPLY(p);
3678 DBGOUT(FN; STRLIT("Got disable_arbitrator from client"); SYCEXP(p->synode); );
3679 ARBITRATOR_HACK = 0;
3680 reply->op = xcom_client_reply;
3681 reply->cli_err = REQUEST_OK;
3682 SEND_REPLY;
3683 break;
3684 }
3685 if (p->a && (p->a->body.c_t == set_cache_limit)) {
3686 CREATE_REPLY(p);
3687 DBGOUT(FN; STRLIT("Got set_cache_limit from client"); SYCEXP(p->synode); );
3688 if(the_app_xcom_cfg){
3689 set_max_cache_size(p->a->body.app_u_u.cache_limit);
3690 reply->cli_err = REQUEST_OK;
3691 }else{
3692 reply->cli_err = REQUEST_FAIL;
3693 }
3694 reply->op = xcom_client_reply;
3695 SEND_REPLY;
3696 break;
3697 }
3698 if (p->a && (p->a->body.c_t == x_terminate_and_exit)) {
3699 CREATE_REPLY(p);
3700 DBGOUT(FN; STRLIT("Got terminate_and_exit from client"); SYCEXP(p->synode); );
3701 reply->op = xcom_client_reply;
3702 reply->cli_err = REQUEST_OK;
3703 SEND_REPLY;
3704 /*
3705 The function frees sites which is used by SEND_REPLY,
3706 so it should be called after SEND_REPLY.
3707 */
3708 terminate_and_exit();
3709 break;
3710 }
3711 if (p->a && (p->a->body.c_t == add_node_type ||
3712 p->a->body.c_t == remove_node_type ||
3713 p->a->body.c_t == force_config_type)) {
3714 client_reply_code cli_err;
3715 CREATE_REPLY(p);
3716 reply->op = xcom_client_reply;
3717 reply->cli_err = cli_err = can_execute_cfgchange(p);
3718 SEND_REPLY;
3719 if (cli_err != REQUEST_OK) {
3720 break;
3721 }
3722 }
3723 if (p->a && p->a->body.c_t == unified_boot_type) {
3724 DBGOUT(FN; STRLIT("Got unified_boot from client"); SYCEXP(p->synode); );
3725 DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(&p->a->body.app_u_u.nodes)); );
3726 DBGOUT(STRLIT("handle_client_msg "); NDBG(p->a->group_id, x));
3727 XCOM_FSM(xa_net_boot, void_arg(p->a));
3728 }
3729 if (p->a && p->a->body.c_t == add_node_type) {
3730 DBGOUT(FN; STRLIT("Got add_node from client"); SYCEXP(p->synode); );
3731 DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(&p->a->body.app_u_u.nodes)); );
3732 DBGOUT(STRLIT("handle_client_msg "); NDBG(p->a->group_id, x));
3733 assert(get_site_def());
3734 }
3735 if (p->a && p->a->body.c_t == remove_node_type) {
3736 DBGOUT(FN; STRLIT("Got remove_node from client"); SYCEXP(p->synode); );
3737 DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(&p->a->body.app_u_u.nodes)); );
3738 DBGOUT(STRLIT("handle_client_msg "); NDBG(p->a->group_id, x));
3739 assert(get_site_def());
3740 }
3741 if (p->a && p->a->body.c_t == force_config_type) {
3742 DBGOUT(FN; STRLIT("Got new config from client"); SYCEXP(p->synode); );
3743 DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(&p->a->body.app_u_u.nodes)); );
3744 DBGOUT(STRLIT("handle_client_msg "); NDBG(p->a->group_id, x));
3745 assert(get_site_def());
3746 XCOM_FSM(xa_force_config, void_arg(p->a));
3747 }
3748 handle_client_msg(p);
3749 break;
3750 case initial_op:
3751 break;
3752 case read_op:
3753 pm = get_cache(p->synode);
3754 assert(pm);
3755 if(client_boot_done)
3756 handle_alive(site, reply_queue, p);
3757 handle_read(site, pm, reply_queue, p);
3758 break;
3759 case prepare_op:
3760 pm = get_cache(p->synode);
3761 assert(pm);
3762 if(p->force_delivery)
3763 pm->force_delivery = 1;
3764 pm->last_modified = task_now();
3765 if(client_boot_done)
3766 handle_alive(site, reply_queue, p);
3767 handle_prepare(site, pm, reply_queue, p);
3768 break;
3769 case ack_prepare_op:
3770 if (in_front || !is_cached(p->synode))
3771 break;
3772 pm = get_cache(p->synode);
3773 if(p->force_delivery)
3774 pm->force_delivery = 1;
3775 if (!pm->proposer.msg)
3776 break;
3777 assert(pm && pm->proposer.msg);
3778 handle_ack_prepare(site, pm, p);
3779 break;
3780 case ack_prepare_empty_op:
3781 if (in_front || !is_cached(p->synode))
3782 break;
3783 pm = get_cache(p->synode);
3784 if(p->force_delivery)
3785 pm->force_delivery = 1;
3786 if (!pm->proposer.msg)
3787 break;
3788 assert(pm && pm->proposer.msg);
3789 handle_ack_prepare_empty(site, pm, p);
3790 break;
3791 case accept_op:
3792 pm = get_cache(p->synode);
3793 assert(pm);
3794 if(p->force_delivery)
3795 pm->force_delivery = 1;
3796 pm->last_modified = task_now();
3797 handle_alive(site, reply_queue, p);
3798 handle_accept(site, pm, reply_queue, p);
3799 break;
3800 case ack_accept_op:
3801 if (in_front || !is_cached(p->synode))
3802 break;
3803 pm = get_cache(p->synode);
3804 if(p->force_delivery)
3805 pm->force_delivery = 1;
3806 if (!pm->proposer.msg)
3807 break;
3808 assert(pm && pm->proposer.msg);
3809 handle_ack_accept(site, pm, p);
3810 break;
3811 case recover_learn_op:
3812 DBGOUT(FN; STRLIT("recover_learn_op receive "); SYCEXP(p->synode));
3813 pm = get_cache(p->synode);
3814 assert(pm);
3815 if(p->force_delivery)
3816 pm->force_delivery = 1;
3817 pm->last_modified = task_now();
3818 update_max_synode(p);
3819 {
3820 DBGOUT(FN; STRLIT("recover_learn_op learn "); SYCEXP(p->synode));
3821 p->op = learn_op;
3822 handle_learn(site, pm, p);
3823 }
3824 break;
3825 case learn_op:
3826 learnop:
3827 pm = get_cache(p->synode);
3828 assert(pm);
3829 if(p->force_delivery)
3830 pm->force_delivery = 1;
3831 pm->last_modified = task_now();
3832 update_max_synode(p);
3833 activate_sweeper();
3834 handle_learn(site, pm, p);
3835 break;
3836 case tiny_learn_op:
3837 if (p->msg_type == no_op)
3838 goto learnop;
3839 pm = get_cache(p->synode);
3840 assert(pm);
3841 if(p->force_delivery)
3842 pm->force_delivery = 1;
3843 if (pm->acceptor.msg) {
3844 /* BALCEXP(pm->acceptor.msg->proposal); */
3845 if (eq_ballot(pm->acceptor.msg->proposal, p->proposal)) {
3846 pm->acceptor.msg->op = learn_op;
3847 pm->last_modified = task_now();
3848 update_max_synode(p);
3849 activate_sweeper();
3850 handle_learn(site, pm, pm->acceptor.msg);
3851 } else {
3852 send_read(p->synode);
3853 DBGOUT(FN; STRLIT("tiny_learn"); SYCEXP(p->synode);
3854 BALCEXP(pm->acceptor.msg->proposal); BALCEXP(p->proposal));
3855 }
3856 } else {
3857 send_read(p->synode);
3858 DBGOUT(FN; STRLIT("tiny_learn"); SYCEXP(p->synode);
3859 BALCEXP(p->proposal));
3860 }
3861 break;
3862 case skip_op:
3863 pm = get_cache(p->synode);
3864 assert(pm);
3865 if(p->force_delivery)
3866 pm->force_delivery = 1;
3867 pm->last_modified = task_now();
3868 handle_skip(site, pm, p);
3869 break;
3870 case i_am_alive_op:
3871 handle_alive(site, reply_queue, p);
3872 break;
3873 case are_you_alive_op:
3874 handle_alive(site, reply_queue, p);
3875 break;
3876 case need_boot_op:
3877 /* purecov: begin deadcode */
3878 XCOM_FSM(xa_need_snapshot, void_arg(p));
3879 break;
3880 /* purecov: end */
3881 case snapshot_op:
3882 if (!is_dead_site(p->group_id)) {
3883 update_max_synode(p);
3884 }
3885 break;
3886 case gcs_snapshot_op:
3887 if (!is_dead_site(p->group_id)) {
3888 update_max_synode(p);
3889 XCOM_FSM(xa_snapshot, void_arg(p));
3890 XCOM_FSM(xa_complete,int_arg(0));
3891 }
3892 break;
3893 case die_op:
3894 /* assert("die horribly" == "need coredump"); */
3895 {
3896 GET_GOUT;
3897 FN;
3898 STRLIT("die_op ");
3899 SYCEXP(executed_msg);
3900 SYCEXP(delivered_msg);
3901 SYCEXP(p->synode);
3902 SYCEXP(p->delivered_msg);
3903 SYCEXP(p->max_synode);
3904 PRINT_GOUT;
3905 FREE_GOUT;
3906 }
3907 /*
3908 If the message with the number in the incoming die_op message
3909 already has been executed (delivered), then it means that we
3910 actually got consensus on it, since otherwise we would not have
3911 delivered it.Such a situation could arise if one of the nodes has
3912 expelled the message from its cache, but others have not. So when
3913 sending out a request, we might get two different answers, one
3914 indicating that we are too far behind and should restart, and
3915 another with the actual consensus value. If the value arrives
3916 first, we will deliver it, and then the die_op may arrive later.
3917 But it this case it does not matter, since we got what we needed
3918 anyway. It is only a partial guard against exiting without really
3919 needing it of course, since the die_op may arrive first, and we
3920 do not wait for a die_op from all the other nodes. We could do
3921 that with some extra housekeeping in the pax_machine (a new bit
3922 vector), but I am not convinced that it is worth the effort.
3923 */
3924 if(!synode_lt(p->synode, executed_msg)){
3925 g_critical("Node %u unable to get messages, since the "
3926 "group is too far ahead. Node will now exit.",
3927 get_nodeno(site));
3928 terminate_and_exit();
3929 }
3930 default:
3931 break;
3932 }
3933 if (oom_abort) {
3934 g_critical("Node %u has run out of memory and will now exit.",
3935 get_nodeno(site));
3936 terminate_and_exit();
3937 }
3938 return(p);
3939 }
3940
3941
3942
3943 /* }}} */
3944
3945 /* {{{ Acceptor-learner task */
acceptor_learner_task(task_arg arg)3946 int acceptor_learner_task(task_arg arg)
3947 {
3948 DECL_ENV
3949 connection_descriptor rfd;
3950 srv_buf *in_buf;
3951
3952 pax_msg * p;
3953 u_int buflen;
3954 char *buf;
3955 linkage reply_queue;
3956 int errors;
3957 server *srv;
3958 END_ENV;
3959
3960 TASK_BEGIN
3961
3962 ep->in_buf = calloc(1, sizeof(srv_buf));
3963
3964 ep->rfd.fd = get_int_arg(arg);
3965 #ifdef XCOM_HAVE_OPENSSL
3966 ep->rfd.ssl_fd = 0;
3967 #endif
3968 ep->p = NULL;
3969 ep->buflen = 0;
3970 ep->buf = NULL;
3971 ep->errors = 0;
3972 ep->srv = 0;
3973
3974 /* We have a connection, make socket non-blocking and wait for request */
3975 unblock_fd(ep->rfd.fd);
3976 set_nodelay(ep->rfd.fd);
3977 wait_io(stack, ep->rfd.fd, 'r');
3978 TASK_YIELD;
3979
3980 #ifdef XCOM_HAVE_OPENSSL
3981 if (xcom_use_ssl()) {
3982 ep->rfd.ssl_fd = SSL_new(server_ctx);
3983 SSL_set_fd(ep->rfd.ssl_fd, ep->rfd.fd);
3984
3985 {
3986 int ret_ssl;
3987 int err;
3988 ERR_clear_error();
3989 ret_ssl = SSL_accept(ep->rfd.ssl_fd);
3990 err = SSL_get_error(ep->rfd.ssl_fd, ret_ssl);
3991
3992 while (ret_ssl != SSL_SUCCESS) {
3993 if (err == SSL_ERROR_WANT_READ){
3994 wait_io(stack, ep->rfd.fd, 'r');
3995 } else if (err == SSL_ERROR_WANT_WRITE){
3996 wait_io(stack, ep->rfd.fd, 'w');
3997 } else { /* Some other error, give up */
3998 break;
3999 }
4000 TASK_YIELD;
4001 SET_OS_ERR(0);
4002 G_DEBUG("acceptor learner accept retry fd %d", ep->rfd.fd);
4003 ERR_clear_error();
4004 ret_ssl = SSL_accept(ep->rfd.ssl_fd);
4005 err = SSL_get_error(ep->rfd.ssl_fd, ret_ssl);
4006 }
4007
4008 if (ret_ssl != SSL_SUCCESS) {
4009 ssl_free_con(&ep->rfd);
4010 close_connection(&ep->rfd);
4011 TERMINATE;
4012 }
4013 }
4014
4015 } else {
4016 ep->rfd.ssl_fd = 0;
4017 }
4018 #endif
4019 set_connected(&ep->rfd, CON_FD);
4020 link_init(&ep->reply_queue, type_hash("msg_link"));
4021
4022 while (!xcom_shutdown) {
4023 int64_t n;
4024 site_def const * site = 0;
4025 unchecked_replace_pax_msg(&ep->p, pax_msg_new_0(null_synode));
4026
4027 if(use_buffered_read){
4028 TASK_CALL(buffered_read_msg(&ep->rfd, ep->in_buf, ep->p, ep->srv, &n));
4029 }else{
4030 TASK_CALL(read_msg(&ep->rfd, ep->p, ep->srv, &n));
4031 }
4032 if (((int)ep->p->op < (int)client_msg || ep->p->op > LAST_OP)) {
4033 /* invalid operation, ignore message */
4034 delete_pax_msg(ep->p);
4035 ep->p = NULL;
4036 TASK_YIELD;
4037 continue;
4038 }
4039 if (n <= 0) {
4040 break;
4041 }
4042 site = find_site_def(ep->p->synode);
4043 /*
4044 Getting a pointer to the server needs to be done after we have
4045 received a message, since without having received a message, we
4046 cannot know who it is from. We could peek at the message and de‐
4047 serialize the message number and from field, but since the server
4048 does not change, it should be sufficient to cache the server in
4049 the acceptor_learner task. A cleaner solution would have been to
4050 move the timestamps out of the server object, and have a map in‐
4051 dexed by IP/port or UUID to track the timestamps, since this is
4052 common to both the sender_task, reply_handler_task, and the ac‐
4053 ceptor_learner_task.
4054 */
4055 // Allow the previous server reference to be freed.
4056 if (ep->srv) srv_unref(ep->srv);
4057 ep->srv = get_server(site, ep->p->from);
4058 // Prevent the new server reference from being freed.
4059 if (ep->srv) srv_ref(ep->srv);
4060 ep->p->refcnt = 1; /* Refcnt from other end is void here */
4061 MAY_DBG(FN;
4062 NDBG(ep->rfd.fd, d); NDBG(task_now(), f);
4063 COPY_AND_FREE_GOUT(dbg_pax_msg(ep->p));
4064 );
4065 receive_count[ep->p->op]++;
4066 receive_bytes[ep->p->op] += (uint64_t)n + MSG_HDR_SIZE;
4067 {
4068 gboolean behind = FALSE;
4069 if (get_maxnodes(site) > 0) {
4070 behind = ep->p->synode.msgno < delivered_msg.msgno;
4071 }
4072 ADD_EVENTS(
4073 add_event(string_arg("before dispatch "));
4074 add_synode_event(ep->p->synode);
4075 add_event(string_arg("ep->p->from"));
4076 add_event(int_arg(ep->p->from));
4077 add_event(string_arg(pax_op_to_str(ep->p->op)));
4078 );
4079 if (ep->p->msg_type == normal ||
4080 ep->p->synode.msgno == 0 || /* Used by i-am-alive and so on */
4081 is_cached(ep->p->synode) || /* Already in cache */
4082 (!behind)) { /* Guard against cache pollution from other nodes */
4083 dispatch_op(site, ep->p, &ep->reply_queue);
4084
4085 /* Send replies on same fd */
4086 while (!link_empty(&ep->reply_queue)) {
4087 msg_link * reply = (msg_link * )(link_extract_first(&ep->reply_queue));
4088 MAY_DBG(FN;
4089 COPY_AND_FREE_GOUT(dbg_linkage(&ep->reply_queue));
4090 COPY_AND_FREE_GOUT(dbg_msg_link(reply));
4091 COPY_AND_FREE_GOUT(dbg_pax_msg(reply->p));
4092 );
4093 assert(reply->p);
4094 assert(reply->p->refcnt > 0);
4095 reply->p->to = ep->p->from;
4096 reply->p->from = ep->p->to;
4097 reply->p->delivered_msg = get_delivered_msg();
4098 reply->p->max_synode = get_max_synode();
4099 serialize_msg(reply->p, ep->rfd.x_proto, &ep->buflen, &ep->buf);
4100 MAY_DBG(FN; COPY_AND_FREE_GOUT(dbg_msg_link(reply));
4101 COPY_AND_FREE_GOUT(dbg_pax_msg(reply->p)));
4102 msg_link_delete(&reply);
4103 if(ep->buflen){
4104 int64_t sent;
4105 TASK_CALL(task_write(&ep->rfd , ep->buf, ep->buflen, &sent));
4106 send_count[ep->p->op]++;
4107 send_bytes[ep->p->op] += ep->buflen;
4108 X_FREE(ep->buf);
4109 }
4110 ep->buf = NULL;
4111 }
4112 } else {
4113 DBGOUT(FN; STRLIT("rejecting ");
4114 STRLIT(pax_op_to_str(ep->p->op));
4115 NDBG(ep->p->from, d); NDBG(ep->p->to, d);
4116 SYCEXP(ep->p->synode);
4117 BALCEXP(ep->p->proposal));
4118 if (xcom_booted() && behind) {
4119 #ifdef USE_EXIT_TYPE
4120 if (ep->p->op == prepare_op) {
4121 miss_prepare(ep->p, &ep->reply_queue);
4122 } else if (ep->p->op == accept_op) {
4123 miss_accept(ep->p, &ep->reply_queue);
4124 }
4125 #else
4126 if (/*ep->p->op == prepare_op && */ was_removed_from_cache(ep->p->synode)) {
4127 DBGOUT(FN; STRLIT("send_die ");
4128 STRLIT(pax_op_to_str(ep->p->op));
4129 NDBG(ep->p->from, d); NDBG(ep->p->to, d);
4130 SYCEXP(ep->p->synode);
4131 BALCEXP(ep->p->proposal));
4132 if (get_maxnodes(site) > 0) {
4133 pax_msg * np = NULL;
4134 np = pax_msg_new(ep->p->synode, site);
4135 ref_msg(np);
4136 np->op = die_op;
4137 np->to = ep->p->from;
4138 np->from = ep->p->to;
4139 np->delivered_msg = get_delivered_msg();
4140 np->max_synode = get_max_synode();
4141 DBGOUT(FN; STRLIT("sending die_op to node "); NDBG(np->to, d);
4142 SYCEXP(executed_msg); SYCEXP(max_synode); SYCEXP(np->synode));
4143 serialize_msg(np, ep->rfd.x_proto, &ep->buflen, &ep->buf);
4144 if(ep->buflen){
4145 int64_t sent;
4146 TASK_CALL(task_write(&ep->rfd , ep->buf, ep->buflen, &sent));
4147 send_count[ep->p->op]++;
4148 send_bytes[ep->p->op] += ep->buflen;
4149 X_FREE(ep->buf);
4150 }
4151 ep->buf = NULL;
4152 unref_msg(&np);
4153 }
4154
4155 }
4156 #endif
4157 }
4158 }
4159 }
4160 /* TASK_YIELD; */
4161 }
4162
4163 FINALLY
4164 MAY_DBG(FN; STRLIT(" shutdown "); NDBG(ep->rfd.fd, d ); NDBG(task_now(), f));
4165 if(ep->reply_queue.suc && !link_empty(&ep->reply_queue))
4166 empty_msg_list(&ep->reply_queue);
4167 unchecked_replace_pax_msg(&ep->p, NULL);
4168 shutdown_connection(&ep->rfd);
4169 DBGOUT(FN; NDBG(xcom_shutdown, d));
4170 if (ep->buf)
4171 X_FREE(ep->buf);
4172 free(ep->in_buf);
4173 // Allow the server reference to be freed.
4174 if (ep->srv) srv_unref(ep->srv);
4175
4176 TASK_END;
4177 }
4178
4179
4180 /* }}} */
4181
4182 /* {{{ Reply handler task */
4183 int const need_boot_special = 1;
4184
4185 static void server_handle_need_snapshot(server *srv, site_def const *s, node_no node);
4186
reply_handler_task(task_arg arg)4187 int reply_handler_task(task_arg arg)
4188 {
4189 DECL_ENV
4190 server * s;
4191 pax_msg * reply;
4192 END_ENV;
4193
4194 TASK_BEGIN
4195
4196 ep->s = (server * )get_void_arg(arg);
4197 srv_ref(ep->s);
4198 ep->reply = NULL;
4199
4200 for (; ; ) {
4201 while (!is_connected(&ep->s->con)) {
4202 MAY_DBG(FN; STRLIT("waiting for connection"));
4203 TASK_DELAY(1.000);
4204 }
4205 {
4206 int64_t n;
4207 unchecked_replace_pax_msg(&ep->reply, pax_msg_new_0( null_synode));
4208
4209 ADD_EVENTS(
4210 add_event(string_arg("ep->s->con.fd"));
4211 add_event(int_arg(ep->s->con.fd));
4212 );
4213 TASK_CALL(read_msg(&ep->s->con, ep->reply, ep->s, &n));
4214 ADD_EVENTS(
4215 add_event(string_arg("ep->s->con.fd"));
4216 add_event(int_arg(ep->s->con.fd));
4217 );
4218 ep->reply->refcnt = 1; /* Refcnt from other end is void here */
4219 if (n <= 0) {
4220 shutdown_connection(&ep->s->con);
4221 continue;
4222 }
4223 receive_bytes[ep->reply->op] += (uint64_t)n + MSG_HDR_SIZE;
4224 }
4225 MAY_DBG(FN;
4226 NDBG(ep->s->con.fd, d); NDBG(task_now(), f);
4227 COPY_AND_FREE_GOUT(dbg_pax_msg(ep->reply));
4228 );
4229 receive_count[ep->reply->op]++;
4230
4231 /* Special test for need_snapshot, since node and site may not be consistent */
4232 if (need_boot_special && ep->reply->op == need_boot_op) {
4233 pax_msg * p = ep->reply;
4234 server_handle_need_snapshot(ep->s, get_site_def(), p->from);
4235 }else{
4236 //We only handle messages from this connection is the server is valid.
4237 if(ep->s->invalid == 0)
4238 dispatch_op(find_site_def(ep->reply->synode), ep->reply, NULL);
4239 }
4240 TASK_YIELD;
4241 }
4242
4243 FINALLY
4244 replace_pax_msg(&ep->reply, NULL);
4245
4246 shutdown_connection(&ep->s->con);
4247 ep->s->reply_handler = NULL;
4248 MAY_DBG(FN; STRLIT(" shutdown "); NDBG(ep->s->con.fd, d); NDBG(task_now(), f));
4249 srv_unref(ep->s);
4250
4251 TASK_END;
4252 }
4253
4254
4255 /* }}} */
4256
4257
4258 /* purecov: begin deadcode */
xcom_sleep(unsigned int seconds)4259 static inline void xcom_sleep(unsigned int seconds)
4260 {
4261 #if defined (WIN32) || defined (WIN64)
4262 Sleep((DWORD)seconds*1000); /* windows sleep takes milliseconds */
4263 #else
4264 sleep(seconds);
4265 #endif
4266 }
4267
4268 /* purecov: end */
4269
4270 /*
4271 * Get a unique long as the basis for XCom group id creation.
4272 *
4273 * NOTE:
4274 * As there is no gethostid() on win, we use seconds since epoch instead,
4275 * so it might fail if you try simultaneous create sites at the same second.
4276 */
get_unique_long(void)4277 long get_unique_long(void)
4278 {
4279 #if defined(WIN32) || defined(WIN64)
4280 __time64_t ltime;
4281
4282 _time64( <ime );
4283 return (long) (ltime ^ GetCurrentProcessId());
4284 #else
4285 return gethostid() ^ getpid();
4286 #endif
4287 }
4288
4289
4290 /* {{{ Coroutine macros */
4291 /*
4292 Coroutine device (or more precisely, a finite state machine, as the
4293 stack is not preserved), described by its inventor Tom Duff as
4294 being "too horrid to go into". The basic idea is that the switch
4295 can be used to jump anywhere in the code, so we note where we are
4296 when we return, and jump there when we enter the routine again by
4297 switching on the state, which is really a line number supplied by
4298 the CO_RETURN macro.
4299 */
4300
4301 #define CO_BEGIN switch(state){ default: assert(state == 0); case 0:
4302 #define CO_END }
4303
4304 #define CO_RETURN(x) \
4305 { \
4306 state = __LINE__; \
4307 return x; \
4308 case __LINE__:; \
4309 }
4310
4311 #define HALT(x) while(1){ CO_RETURN(x);}
4312
4313 /* purecov: begin deadcode */
send_app_data(app_data_ptr a)4314 void send_app_data(app_data_ptr a)
4315 {
4316 pax_msg * msg = pax_msg_new(null_synode, get_proposer_site());
4317 xcom_send(a, msg);
4318 }
4319
xcom_send_data(uint32_t size,char * data)4320 void xcom_send_data(uint32_t size, char *data)
4321 {
4322 app_data_ptr a = new_app_data();
4323 a->body.c_t = app_type;
4324 a->body.app_u_u.data.data_len = size;
4325 a->body.app_u_u.data.data_val = data;
4326 send_app_data(a);
4327 }
4328
create_config(node_list * nl,cargo_type type)4329 app_data_ptr create_config(node_list *nl, cargo_type type)
4330 {
4331 app_data_ptr a = new_app_data();
4332 a->body.c_t = type;
4333 init_node_list(nl->node_list_len, nl->node_list_val, &a->body.app_u_u.nodes);
4334 return a;
4335 }
4336 /* purecov: end */
4337
init_config_with_group(app_data * a,node_list * nl,cargo_type type,uint32_t group_id)4338 app_data_ptr init_config_with_group(app_data *a, node_list *nl, cargo_type type,
4339 uint32_t group_id)
4340 {
4341 init_app_data(a);
4342 a->app_key.group_id = a->group_id = group_id;
4343 a->body.c_t = type;
4344 init_node_list(nl->node_list_len, nl->node_list_val, &a->body.app_u_u.nodes);
4345 return a;
4346 }
4347
4348 /* purecov: begin deadcode */
create_config_with_group(node_list * nl,cargo_type type,uint32_t group_id)4349 app_data_ptr create_config_with_group(node_list *nl, cargo_type type,
4350 uint32_t group_id)
4351 {
4352 app_data_ptr a = new_app_data();
4353 return init_config_with_group(a, nl, type, group_id);
4354 }
4355
send_boot(node_list * nl)4356 void send_boot(node_list *nl)
4357 { app_data_ptr a = create_config(nl, unified_boot_type);
4358 install_node_group(a); /* Cannot get consensus unless group is known */
4359 send_app_data(a);
4360 }
4361
send_add_node(node_list * nl)4362 void send_add_node(node_list *nl)
4363 {
4364 send_app_data(create_config(nl, add_node_type));
4365 }
4366
send_remove_node(node_list * nl)4367 void send_remove_node(node_list *nl)
4368 {
4369 send_app_data(create_config(nl, remove_node_type));
4370 }
4371
send_config(node_list * nl)4372 void send_config(node_list *nl)
4373 {
4374 send_app_data(create_config(nl, force_config_type));
4375 }
4376
send_client_app_data(char * srv,xcom_port port,app_data_ptr a)4377 void send_client_app_data(char *srv, xcom_port port, app_data_ptr a)
4378 {
4379 pax_msg * msg = pax_msg_new(null_synode, 0);
4380 envelope *e = calloc(1, sizeof(envelope));
4381
4382 msg->a = a;
4383 msg->to = VOID_NODE_NO;
4384 msg->op = client_msg;
4385 e->srv = strdup(srv);
4386 e->port = port;
4387 e->p = msg;
4388 e->crash_on_error = 0;
4389 task_new(client_task, void_arg(e), "client_task", XCOM_THREAD_DEBUG);
4390 }
4391
send_client_boot(char * srv,xcom_port port,node_list * nl)4392 void send_client_boot(char *srv, xcom_port port, node_list *nl)
4393 {
4394 send_client_app_data(srv, port, create_config(nl, unified_boot_type));
4395 }
4396
send_client_add_node(char * srv,xcom_port port,node_list * nl)4397 void send_client_add_node(char *srv, xcom_port port, node_list *nl)
4398 {
4399 send_client_app_data(srv, port, create_config(nl, add_node_type));
4400 }
4401
send_client_remove_node(char * srv,xcom_port port,node_list * nl)4402 void send_client_remove_node(char *srv, xcom_port port, node_list *nl)
4403 {
4404 send_client_app_data(srv, port, create_config(nl, remove_node_type));
4405 }
4406
send_client_config(char * srv,xcom_port port,node_list * nl)4407 void send_client_config(char *srv, xcom_port port, node_list *nl)
4408 {
4409 send_client_app_data(srv, port, create_config(nl, force_config_type));
4410 }
4411 /* purecov: end */
4412
server_send_snapshot(server * srv,site_def const * s,gcs_snapshot * gcs_snap,node_no node)4413 static void server_send_snapshot(server *srv, site_def const *s, gcs_snapshot *gcs_snap, node_no node)
4414 {
4415 pax_msg * p = pax_msg_new(gcs_snap->log_start, get_site_def());
4416 ref_msg(p);
4417 p->op = gcs_snapshot_op;
4418 p->gcs_snap = gcs_snap;
4419 send_msg(srv, s->nodeno, node, get_group_id(s), p);
4420 unref_msg(&p);
4421 }
4422
4423 /* purecov: begin deadcode */
send_snapshot(site_def const * s,gcs_snapshot * gcs_snap,node_no node)4424 static void send_snapshot(site_def const *s, gcs_snapshot *gcs_snap, node_no node)
4425 {
4426 assert(s->servers[node]);
4427 server_send_snapshot(s->servers[node], s, gcs_snap, node);
4428 }
4429 /* purecov: end */
4430
server_push_log(server * srv,synode_no push,node_no node)4431 static void server_push_log(server *srv, synode_no push, node_no node)
4432 {
4433 site_def const *s = get_site_def();
4434 while (!synode_gt(push, get_max_synode())) {
4435 if (is_cached(push)) {
4436 pax_machine * p = get_cache_no_touch(push);
4437 if (pm_finished(p)) {
4438 /* Need to clone message here since pax_machine may be re-used while message is sent */
4439 pax_msg * pm = clone_pax_msg(p->learner.msg);
4440 if (pm != NULL) {
4441 ref_msg(pm);
4442 pm->op = recover_learn_op;
4443 send_msg(srv, s->nodeno, node, get_group_id(s), pm);
4444 unref_msg(&pm);
4445 }
4446 }
4447 }
4448 push = incr_synode(push);
4449 }
4450 }
4451
4452 /* purecov: begin deadcode */
push_log(synode_no push,node_no node)4453 static void push_log(synode_no push, node_no node)
4454 {
4455 site_def const * s = get_site_def();
4456 assert(s->servers[node]);
4457 server_push_log(s->servers[node], push, node);
4458 }
4459 /* purecov: end */
4460
4461 static app_snap_getter get_app_snap;
4462 static app_snap_handler handle_app_snap;
4463
4464 /* purecov: begin deadcode */
handle_need_snapshot(site_def const * s,node_no node)4465 static void handle_need_snapshot(site_def const *s, node_no node)
4466 {
4467 gcs_snapshot * gs = export_config();
4468 synode_no app_lsn = get_app_snap(&gs->app_snap);
4469 if (!synode_eq(null_synode, app_lsn) && synode_lt(app_lsn, gs->log_start))
4470 gs->log_start = app_lsn;
4471 send_snapshot(s, gs, node);
4472 push_log(gs->log_start, node);
4473 }
4474 /* purecov: end */
4475
server_handle_need_snapshot(server * srv,site_def const * s,node_no node)4476 static void server_handle_need_snapshot(server *srv, site_def const *s, node_no node)
4477 {
4478 gcs_snapshot * gs = export_config();
4479 synode_no app_lsn = get_app_snap(&gs->app_snap);
4480 if (!synode_eq(null_synode, app_lsn) && synode_lt(app_lsn, gs->log_start)){
4481 gs->log_start = app_lsn;
4482 }
4483 else if (!synode_eq(null_synode, last_config_modification_id)) {
4484 gs->log_start = last_config_modification_id;
4485 }
4486
4487 server_send_snapshot(srv, s, gs, node);
4488 server_push_log(srv, gs->log_start, node);
4489 }
4490
4491 #define X(b) #b,
4492 const char *xcom_state_name[] = {
4493 x_state_list
4494 };
4495
4496 const char *xcom_actions_name[] = {
4497 x_actions
4498 };
4499 #undef X
4500
xcom_fsm(xcom_actions action,task_arg fsmargs)4501 xcom_state xcom_fsm(xcom_actions action, task_arg fsmargs)
4502 {
4503 static int state = 0;
4504 G_DEBUG("state %d action %s", state, xcom_actions_name[action]);
4505 switch (state) {
4506 default:
4507 assert(state == 0);
4508 case 0:
4509 /* Initialize basic xcom data */
4510 xcom_thread_init();
4511 start:
4512 for (; ; ) {
4513 if (action == xa_init) {
4514 xcom_shutdown = 0;
4515 sent_alive = 0.0;
4516 oom_abort = 0;
4517 }
4518 if (action == xa_u_boot) {
4519 /* purecov: begin deadcode */
4520 node_list * nl = get_void_arg(fsmargs);
4521 app_data_ptr a = create_config(nl, unified_boot_type);
4522 install_node_group(a); /* Cannot get consensus unless group is known */
4523 send_app_data(a);
4524 set_executed_msg(incr_msgno(get_site_def()->start));
4525 goto run;
4526 /* purecov: end */
4527 }
4528 if (action == xa_add) {
4529 /* purecov: begin deadcode */
4530 add_args * a = get_void_arg(fsmargs);
4531 send_client_add_node(a->addr, a->port, a->nl);
4532 /* purecov: end */
4533 }
4534 if (action == xa_net_boot) {
4535 app_data * a = get_void_arg(fsmargs);
4536 install_node_group(a);
4537 set_executed_msg(incr_msgno(get_site_def()->start));
4538 goto run;
4539 }
4540 if (action == xa_snapshot) {
4541 goto recover;
4542 }
4543 if (action == xa_exit) {
4544 /* Xcom is finished when we get here */
4545 bury_site(get_group_id(get_site_def()));
4546 task_terminate_all(); /* Kill, kill, kill, kill, kill, kill. This is the end. */
4547
4548 init_xcom_base(); /* Reset shared variables */
4549 init_tasks(); /* Reset task variables */
4550 free_site_defs();
4551 free_forced_config_site_def();
4552 garbage_collect_servers();
4553 DBGOUT(FN; STRLIT("shutting down"));
4554 xcom_shutdown = 1;
4555 if(xcom_exit_cb)
4556 xcom_exit_cb(get_int_arg(fsmargs));
4557 G_DEBUG("Exiting xcom thread");
4558 }
4559 CO_RETURN(x_start);
4560 }
4561 recover:
4562 {
4563 pax_msg * p = get_void_arg(fsmargs);
4564 import_config(p->gcs_snap);
4565 handle_app_snap(&p->gcs_snap->app_snap);
4566 set_executed_msg(p->gcs_snap->log_start);
4567
4568 set_last_received_config(p->gcs_snap->log_start);
4569
4570 DBGOUT(FN; SYCEXP(executed_msg); );
4571 for (; ; ) {
4572 if (action == xa_terminate) {
4573 goto start;
4574 }
4575 if (action == xa_complete) {
4576 goto run;
4577 }
4578
4579 CO_RETURN(x_recover);
4580 }
4581 }
4582 run:
4583 DBGOUT(FN; SYCEXP(executed_msg); );
4584 if(xcom_run_cb)
4585 xcom_run_cb(0);
4586 force_recover = 0;
4587 client_boot_done = 1;
4588 netboot_ok = 1;
4589 booting = 0;
4590 set_proposer_startpoint();
4591 create_proposers();
4592 set_task(&executor, task_new(executor_task, null_arg, "executor_task", XCOM_THREAD_DEBUG));
4593 set_task(&sweeper, task_new(sweeper_task, null_arg, "sweeper_task", XCOM_THREAD_DEBUG));
4594 set_task(&detector, task_new(detector_task, null_arg, "detector_task", XCOM_THREAD_DEBUG));
4595 set_task(&alive_t, task_new(alive_task, null_arg, "alive_task", XCOM_THREAD_DEBUG));
4596
4597 for (; ; ) {
4598 if (action == xa_terminate) {
4599 force_recover = 0;
4600 client_boot_done = 0;
4601 netboot_ok = 0;
4602 booting = 0;
4603 oom_abort = 0;
4604 terminate_proposers();
4605 init_proposers();
4606 task_terminate(executor);
4607 set_task(&executor, NULL);
4608 task_terminate(sweeper);
4609 set_task(&sweeper, NULL);
4610 task_terminate(detector);
4611 set_task(&detector, NULL);
4612 task_terminate(alive_t);
4613 set_task(&alive_t, NULL);
4614
4615 init_xcom_base(); /* Reset shared variables */
4616 free_site_defs();
4617 free_forced_config_site_def();
4618 garbage_collect_servers();
4619 if(xcom_terminate_cb)
4620 xcom_terminate_cb(get_int_arg(fsmargs));
4621 goto start;
4622 }
4623 if (action == xa_need_snapshot) {
4624 pax_msg * p = get_void_arg(fsmargs);
4625 handle_need_snapshot(find_site_def(p->synode), p->from);
4626 }
4627 if (action == xa_force_config) {
4628 app_data * a = get_void_arg(fsmargs);
4629 site_def *s = create_site_def_with_start(a, executed_msg);
4630 s->boot_key = executed_msg;
4631 invalidate_servers(get_site_def(), s);
4632 start_force_config(s);
4633 }
4634 CO_RETURN(x_run);
4635 }
4636 }
4637 }
4638
4639 /* purecov: begin deadcode */
xcom_add_node(char * addr,xcom_port port,node_list * nl)4640 void xcom_add_node(char *addr, xcom_port port, node_list *nl)
4641 {
4642 if (xcom_mynode_match(addr, port)) {
4643 XCOM_FSM(xa_u_boot, void_arg(nl)); /* Boot */
4644 } else {
4645 add_args a;
4646 a.addr = addr;
4647 a.port = port;
4648 a.nl = nl;
4649 XCOM_FSM(xa_add, void_arg(&a)); /* Only initialize xcom */
4650 }
4651 }
4652
4653
xcom_fsm_add_node(char * addr,node_list * nl)4654 void xcom_fsm_add_node(char *addr, node_list *nl)
4655 {
4656 xcom_port node_port = xcom_get_port(addr);
4657 char *node_addr = xcom_get_name(addr);
4658
4659 if (xcom_mynode_match(node_addr, node_port)) {
4660 node_list x_nl;
4661 x_nl.node_list_len = 1;
4662 x_nl.node_list_val = new_node_address(x_nl.node_list_len, &addr);
4663 XCOM_FSM(xa_u_boot, void_arg(&x_nl));
4664 delete_node_address(x_nl.node_list_len, x_nl.node_list_val);
4665 } else {
4666 add_args a;
4667 a.addr = node_addr;
4668 a.port = node_port;
4669 a.nl = nl;
4670 XCOM_FSM(xa_add, void_arg(&a));
4671 }
4672 free(node_addr);
4673 }
4674 /* purecov: end */
4675
set_app_snap_handler(app_snap_handler x)4676 void set_app_snap_handler(app_snap_handler x)
4677 {
4678 handle_app_snap = x;
4679 }
4680
set_app_snap_getter(app_snap_getter x)4681 void set_app_snap_getter(app_snap_getter x)
4682 {
4683 get_app_snap = x;
4684 }
4685
4686 /* Initialize sockaddr based on server and port */
init_sockaddr(char * server,struct sockaddr_in * sock_addr,socklen_t * sock_size,xcom_port port)4687 static int init_sockaddr(char *server, struct sockaddr_in *sock_addr,
4688 socklen_t *sock_size, xcom_port port)
4689 {
4690 /* Get address of server */
4691 struct addrinfo *addr = 0;
4692
4693 checked_getaddrinfo(server, 0, 0, &addr);
4694
4695 if (addr == 0) {
4696 return 0;
4697 }
4698
4699 /* Copy first address */
4700 memcpy(sock_addr, addr->ai_addr, addr->ai_addrlen);
4701 *sock_size = addr->ai_addrlen;
4702 sock_addr->sin_port = htons(port);
4703 freeaddrinfo(addr);
4704
4705 return 1;
4706 }
4707
4708
checked_create_socket(int domain,int type,int protocol)4709 static result checked_create_socket(int domain, int type, int protocol)
4710 {
4711 result retval = {0,0};
4712 int retry = 1000;
4713
4714 do {
4715 SET_OS_ERR(0);
4716 retval.val = socket(domain, type, protocol);
4717 retval.funerr = to_errno(GET_OS_ERR);
4718 } while (--retry && retval.val == -1 && (from_errno(retval.funerr) == SOCK_EAGAIN));
4719
4720 if (retval.val == -1) {
4721 task_dump_err(retval.funerr);
4722 #if defined (WIN32) || defined (WIN64)
4723 G_MESSAGE("Socket creation failed with error %d.",
4724 retval.funerr);
4725 #else
4726 G_MESSAGE("Socket creation failed with error %d - %s.",
4727 retval.funerr, strerror(retval.funerr));
4728 #endif
4729 abort();
4730 }
4731 return retval;
4732 }
4733
4734 /* Read max n bytes from socket fd into buffer buf */
socket_read(connection_descriptor * rfd,void * buf,int n)4735 static result socket_read(connection_descriptor*rfd, void *buf, int n)
4736 {
4737 result ret = {0,0};
4738
4739 assert(n >= 0);
4740
4741 do {
4742 ret = con_read(rfd, buf, n);
4743 task_dump_err(ret.funerr);
4744 } while (ret.val < 0 && can_retry_read(ret.funerr));
4745 assert(!can_retry_read(ret.funerr));
4746 return ret;
4747 }
4748
4749
4750 /* Read exactly n bytes from socket fd into buffer buf */
socket_read_bytes(connection_descriptor * rfd,char * p,uint32_t n)4751 static int64_t socket_read_bytes(connection_descriptor *rfd, char *p, uint32_t n)
4752 {
4753 uint32_t left= n;
4754 char *bytes= p;
4755
4756 result nread = {0,0};
4757
4758 while (left > 0) {
4759 /*
4760 socket_read just reads no more than INT_MAX bytes. We should not pass
4761 a length more than INT_MAX to it.
4762 */
4763 int r = (int) MIN(left, INT_MAX);
4764
4765 nread = socket_read(rfd, bytes, r);
4766 if (nread.val == 0) {
4767 return 0;
4768 } else if (nread.val < 0) {
4769 return - 1;
4770 } else {
4771 bytes += nread.val;
4772 left -= (uint32_t)nread.val;
4773 }
4774 }
4775 assert(left == 0);
4776 return n;
4777 }
4778
4779 /* Write n bytes from buffer buf to socket fd */
socket_write(connection_descriptor * wfd,void * _buf,uint32_t n)4780 static int64_t socket_write(connection_descriptor *wfd, void *_buf, uint32_t n)
4781 {
4782 char *buf = (char*) _buf;
4783 result ret = {0,0};
4784
4785 uint32_t total; /* Keeps track of number of bytes written so far */
4786
4787 total = 0;
4788 while (total < n) {
4789 int w= (int) MIN(n - total, INT_MAX);
4790
4791 while ((ret = con_write(wfd, buf + total, w)).val < 0 &&
4792 can_retry_write(ret.funerr)) {
4793 task_dump_err(ret.funerr);
4794 DBGOUT(FN; STRLIT("retry "); NEXP(total, d); NEXP(n, d));
4795 }
4796 if (ret.val <= 0) { /* Something went wrong */
4797 task_dump_err(ret.funerr);
4798 return - 1;
4799 } else {
4800 total += (uint32_t)ret.val; /* Add number of bytes written to total */
4801 }
4802 }
4803 DBGOUT(FN; NEXP(total, u); NEXP(n, u));
4804 assert(total == n);
4805 return(total);
4806 }
4807
xcom_close_socket(int * sock)4808 static inline result xcom_close_socket(int *sock)
4809 {
4810 result res = {0,0};
4811 if (*sock != -1) {
4812 do {
4813 SET_OS_ERR(0);
4814 res.val = CLOSESOCKET(*sock);
4815 res.funerr = to_errno(GET_OS_ERR);
4816 } while (res.val == -1 && from_errno(res.funerr) == SOCK_EINTR);
4817 *sock = -1;
4818 }
4819 return res;
4820 }
4821
xcom_shut_close_socket(int * sock)4822 static inline result xcom_shut_close_socket(int *sock)
4823 {
4824 result res = {0,0};
4825 if (*sock >= 0) {
4826 #if defined (WIN32) || defined (WIN64)
4827 static LPFN_DISCONNECTEX DisconnectEx = NULL;
4828 if (DisconnectEx == NULL)
4829 {
4830 DWORD dwBytesReturned;
4831 GUID guidDisconnectEx = WSAID_DISCONNECTEX;
4832 WSAIoctl(*sock, SIO_GET_EXTENSION_FUNCTION_POINTER,
4833 &guidDisconnectEx, sizeof(GUID),
4834 &DisconnectEx, sizeof(DisconnectEx),
4835 &dwBytesReturned, NULL, NULL);
4836 }
4837 if (DisconnectEx != NULL)
4838 {
4839 (DisconnectEx(*sock, (LPOVERLAPPED)NULL,
4840 (DWORD)0, (DWORD)0) == TRUE) ? 0 : -1;
4841 }
4842 else
4843 #endif
4844 shutdown(*sock, _SHUT_RDWR);
4845 res = xcom_close_socket(sock);
4846 }
4847 return res;
4848 }
4849
4850 #define CONNECT_FAIL ret_fd = -1; goto end
4851
timed_connect(int fd,sockaddr * sock_addr,socklen_t sock_size)4852 static int timed_connect(int fd, sockaddr *sock_addr, socklen_t sock_size)
4853 {
4854 int timeout = 10000;
4855 int ret_fd = fd;
4856 int syserr;
4857 int sysret;
4858 struct pollfd fds;
4859 #ifdef WITH_LOG_DEBUG
4860 char buf[SYS_STRERROR_SIZE];
4861 #endif
4862
4863 fds.fd = fd;
4864 fds.events = POLLOUT;
4865 fds.revents = 0;
4866
4867 /* Set non-blocking */
4868 if (unblock_fd(fd) < 0)
4869 return -1;
4870
4871 /* Trying to connect with timeout */
4872 SET_OS_ERR(0);
4873 sysret = connect(fd, sock_addr, sock_size);
4874
4875 if (is_socket_error(sysret)) {
4876 syserr = GET_OS_ERR;
4877 /* If the error is SOCK_EWOULDBLOCK or SOCK_EINPROGRESS or SOCK_EALREADY,
4878 * wait. */
4879 switch (syserr) {
4880 case SOCK_EWOULDBLOCK:
4881 case SOCK_EINPROGRESS:
4882 case SOCK_EALREADY:
4883 break;
4884 default:
4885 G_DEBUG("connect - Error connecting (socket=%d, error=%d).",
4886 fd, syserr);
4887 CONNECT_FAIL;
4888 }
4889
4890 SET_OS_ERR(0);
4891 while ((sysret = poll(&fds, 1, timeout)) < 0) {
4892 syserr = GET_OS_ERR;
4893 if (syserr != SOCK_EINTR && syserr != SOCK_EINPROGRESS) break;
4894 SET_OS_ERR(0);
4895 }
4896 MAY_DBG(FN; STRLIT("poll - Finished. "); NEXP(sysret, d));
4897
4898 if (sysret == 0) {
4899 G_DEBUG("Timed out while waiting for connection to be established! "
4900 "Canceling connection attempt. (socket= %d, error=%d)",
4901 fd, sysret);
4902 CONNECT_FAIL;
4903 }
4904
4905 if (is_socket_error(sysret)) {
4906 G_DEBUG("poll - Error while connecting! (socket= %d, error=%d)",
4907 fd, syserr);
4908 CONNECT_FAIL;
4909 }
4910
4911 {
4912 int socket_errno = 0;
4913 socklen_t socket_errno_len = sizeof(socket_errno);
4914
4915 if ((fds.revents & POLLOUT) == 0) {
4916 MAY_DBG(FN; STRLIT("POLLOUT not set - Socket failure!"););
4917 ret_fd = -1;
4918 }
4919
4920 if (fds.revents & (POLLERR | POLLHUP | POLLNVAL)) {
4921 MAY_DBG(FN;
4922 STRLIT("POLLERR | POLLHUP | POLLNVAL set - Socket failure!"););
4923 ret_fd = -1;
4924 }
4925
4926 if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &socket_errno,
4927 &socket_errno_len) != 0) {
4928 G_DEBUG("getsockopt socket %d failed.", fd);
4929 ret_fd = -1;
4930 } else {
4931 if (socket_errno != 0) {
4932 G_DEBUG("Connection to socket %d failed with error %d - %s.", fd,
4933 socket_errno, strerr_msg(buf, sizeof(buf), socket_errno));
4934 ret_fd = -1;
4935 }
4936 }
4937 }
4938 }
4939
4940 end:
4941 /* Set blocking */
4942 SET_OS_ERR(0);
4943 if(block_fd(fd) < 0) {
4944 G_DEBUG(
4945 "Unable to set socket back to blocking state. (socket=%d, error=%d).",
4946 fd, GET_OS_ERR);
4947 return -1;
4948 }
4949
4950 return ret_fd;
4951 }
4952
4953
4954 /* Connect to server on given port */
connect_xcom(char * server,xcom_port port)4955 static connection_descriptor* connect_xcom(char *server, xcom_port port)
4956 {
4957 result fd = {0,0};
4958 result ret = {0,0};
4959 struct sockaddr_in sock_addr;
4960 socklen_t sock_size;
4961 #ifdef WITH_LOG_DEBUG
4962 char buf[SYS_STRERROR_SIZE];
4963 #endif
4964
4965 DBGOUT(FN; STREXP(server); NEXP(port, d));
4966 G_DEBUG("connecting to %s %d", server, port);
4967 /* Create socket */
4968 if ((fd = checked_create_socket(AF_INET, SOCK_STREAM, 0)).val < 0) {
4969 G_DEBUG("Error creating sockets.");
4970 return NULL;
4971 }
4972
4973 /* Get address of server */
4974 if (!init_sockaddr(server, &sock_addr, &sock_size, port)) {
4975 xcom_close_socket(&fd.val);
4976 G_DEBUG("Error initializing socket addresses.");
4977 return NULL;
4978 }
4979
4980 /* Connect socket to address */
4981
4982 SET_OS_ERR(0);
4983 if (timed_connect(fd.val, (struct sockaddr *)&sock_addr, sock_size) == -1) {
4984 fd.funerr = to_errno(GET_OS_ERR);
4985 G_DEBUG("Connecting socket to address %s in port %d failed with error %d - %s.",
4986 server, port, fd.funerr, strerr_msg(buf, sizeof(buf), fd.funerr));
4987 xcom_close_socket(&fd.val);
4988 return NULL;
4989 }
4990
4991 {
4992 int peer = 0;
4993 /* Sanity check before return */
4994 SET_OS_ERR(0);
4995 ret.val = peer = getpeername(fd.val, (struct sockaddr *)&sock_addr,
4996 &sock_size);
4997 ret.funerr = to_errno(GET_OS_ERR);
4998 if (peer >= 0) {
4999 ret = set_nodelay(fd.val);
5000 if(ret.val < 0){
5001 task_dump_err(ret.funerr);
5002 xcom_shut_close_socket(&fd.val);
5003 #if defined (WIN32) || defined (WIN64)
5004 G_DEBUG("Setting node delay failed while connecting to %s with error %d.",
5005 server, ret.funerr);
5006 #else
5007 G_DEBUG("Setting node delay failed while connecting to %s with error %d - %s.",
5008 server, ret.funerr, strerror(ret.funerr));
5009 #endif
5010 return NULL;
5011 }
5012 G_DEBUG("client connected to %s %d fd %d", server, port, fd.val);
5013 } else {
5014 /* Something is wrong */
5015 socklen_t errlen = sizeof(ret.funerr);
5016 DBGOUT(FN; STRLIT("getpeername failed"); );
5017 if (ret.funerr) {
5018 DBGOUT(FN; NEXP(from_errno(ret.funerr), d);
5019 STRLIT(strerror(from_errno(ret.funerr))));
5020 }
5021 getsockopt(fd.val, SOL_SOCKET, SO_ERROR, (void *) & ret.funerr, &errlen);
5022 if (ret.funerr == 0) {
5023 ret.funerr = to_errno(SOCK_ECONNREFUSED);
5024 }
5025 xcom_shut_close_socket(&fd.val);
5026 #if defined (WIN32) || defined (WIN64)
5027 G_DEBUG("Getting the peer name failed while connecting to server %s with error %d.",
5028 server, ret.funerr);
5029 #else
5030 G_DEBUG("Getting the peer name failed while connecting to server %s with error %d -%s.",
5031 server, ret.funerr, strerror(ret.funerr));
5032 #endif
5033 return NULL;
5034 }
5035
5036 #ifdef XCOM_HAVE_OPENSSL
5037 if (xcom_use_ssl()) {
5038 connection_descriptor *cd = 0;
5039 SSL * ssl = SSL_new(client_ctx);
5040 G_DEBUG("Trying to connect using SSL.")
5041 SSL_set_fd(ssl, fd.val);
5042
5043 ERR_clear_error();
5044 ret.val = SSL_connect(ssl);
5045 ret.funerr = to_ssl_err(SSL_get_error(ssl, ret.val));
5046
5047 if (ret.val != SSL_SUCCESS) {
5048 G_MESSAGE("Error connecting using SSL %d %d.",
5049 ret.funerr, SSL_get_error(ssl, ret.val));
5050 task_dump_err(ret.funerr);
5051 SSL_shutdown(ssl);
5052 SSL_free(ssl);
5053 xcom_shut_close_socket(&fd.val);
5054 return NULL;
5055 }
5056 DBGOUT(FN; STRLIT("ssl connected to "); STRLIT(server); NDBG(port,d); NDBG(fd.val, d); PTREXP(ssl));
5057
5058 if (ssl_verify_server_cert(ssl, server))
5059 {
5060 G_MESSAGE("Error validating certificate and peer.");
5061 task_dump_err(ret.funerr);
5062 SSL_shutdown(ssl);
5063 SSL_free(ssl);
5064 xcom_shut_close_socket(&fd.val);
5065 return NULL;
5066 }
5067
5068 cd = new_connection(fd.val, ssl);
5069 set_connected(cd, CON_FD);
5070 G_DEBUG("Success connecting using SSL.")
5071 return cd;
5072 } else {
5073 connection_descriptor *cd = new_connection(fd.val, 0);
5074 set_connected(cd, CON_FD);
5075 return cd;
5076 }
5077 #else
5078 {
5079 connection_descriptor *cd = new_connection(fd.val);
5080 set_connected(cd, CON_FD);
5081 return cd;
5082 }
5083 #endif
5084 }
5085 }
5086
xcom_open_client_connection(char * server,xcom_port port)5087 connection_descriptor* xcom_open_client_connection(char *server, xcom_port port)
5088 {
5089 return connect_xcom(server, port);
5090 }
5091
5092 /* Send a protocol negotiation message on connection con */
xcom_send_proto(connection_descriptor * con,xcom_proto x_proto,x_msg_type x_type,unsigned int tag)5093 static int xcom_send_proto(connection_descriptor *con, xcom_proto x_proto, x_msg_type x_type, unsigned int tag)
5094 {
5095 char buf[MSG_HDR_SIZE];
5096 memset(buf, 0, MSG_HDR_SIZE);
5097
5098 if (con->fd >= 0) {
5099 con->snd_tag = tag;
5100 write_protoversion(VERS_PTR((unsigned char*) buf), x_proto);
5101 put_header_1_0((unsigned char*) buf, 0, x_type, tag);
5102 {
5103 int sent;
5104 sent = (int)socket_write(con, buf, MSG_HDR_SIZE);
5105 if (con->fd < 0) {
5106 return -1;
5107 }
5108 return sent;
5109 }
5110 } else {
5111 return -1;
5112 }
5113 }
5114
xcom_recv_proto(connection_descriptor * rfd,xcom_proto * x_proto,x_msg_type * x_type,unsigned int * tag)5115 static int xcom_recv_proto(connection_descriptor * rfd, xcom_proto *x_proto, x_msg_type *x_type, unsigned int *tag)
5116 {
5117 int n;
5118 unsigned char header_buf[MSG_HDR_SIZE];
5119 uint32_t msgsize;
5120
5121 /* Read length field, protocol version, and checksum */
5122 n = (int)socket_read_bytes(rfd, (char*)header_buf, MSG_HDR_SIZE);
5123
5124 if (n != MSG_HDR_SIZE) {
5125 DBGOUT(FN; NDBG(n, d));
5126 return -1;
5127 }
5128
5129 *x_proto = read_protoversion(VERS_PTR(header_buf));
5130 get_header_1_0(header_buf, &msgsize, x_type, tag);
5131
5132 return n;
5133 }
5134
5135 #define TAG_START 313
5136
xcom_send_client_app_data(connection_descriptor * fd,app_data_ptr a,int force)5137 static int64_t xcom_send_client_app_data(connection_descriptor *fd, app_data_ptr a, int force)
5138 {
5139 pax_msg * msg = pax_msg_new(null_synode, 0);
5140 uint32_t buflen = 0;
5141 char *buf = 0;
5142 int64_t retval= 0;
5143
5144 if(! proto_done(fd)){
5145 xcom_proto x_proto;
5146 x_msg_type x_type;
5147 unsigned int tag;
5148 retval = xcom_send_proto(fd, my_xcom_version, x_version_req, TAG_START);
5149 G_DEBUG("client sent negotiation request for protocol %d",my_xcom_version);
5150 if(retval < 0)
5151 goto end;
5152 retval = xcom_recv_proto(fd, &x_proto, &x_type, &tag);
5153 if(retval < 0)
5154 goto end;
5155 if(tag != TAG_START)
5156 {
5157 retval = -1;
5158 goto end;
5159 }
5160 if(x_type != x_version_reply)
5161 {
5162 retval = -1;
5163 goto end;
5164 }
5165
5166 if(x_proto == x_unknown_proto){
5167 G_DEBUG("no common protocol, returning error");
5168 retval = -1;
5169 goto end;
5170 }
5171 G_DEBUG("client connection will use protocol version %d",x_proto);
5172 DBGOUT(STRLIT("client connection will use protocol version ");
5173 NDBG(x_proto,u); STRLIT(xcom_proto_to_str(x_proto)));
5174 fd->x_proto = x_proto;
5175 set_connected(fd, CON_PROTO);
5176 }
5177 msg->a = a;
5178 msg->to = VOID_NODE_NO;
5179 msg->op = client_msg;
5180 msg->force_delivery = force;
5181
5182 serialize_msg(msg, fd->x_proto, &buflen, &buf);
5183 if(buflen){
5184 retval = socket_write(fd, buf, buflen);
5185 if (buflen != retval) {
5186 DBGOUT(FN; STRLIT("write failed "); NDBG(fd->fd, d);
5187 NDBG(buflen, d); NDBG(retval, d));
5188 }
5189 X_FREE(buf);
5190 }
5191 end:
5192 msg->a = 0; /* Do not deallocate a */
5193 XCOM_XDR_FREE(xdr_pax_msg, msg);
5194 return retval;
5195 }
5196
xcom_client_send_data(uint32_t size,char * data,connection_descriptor * fd)5197 int64_t xcom_client_send_data(uint32_t size, char *data, connection_descriptor *fd)
5198 {
5199 app_data a;
5200 int64_t retval = 0;
5201 init_app_data(&a);
5202 a.body.c_t = app_type;
5203 a.body.app_u_u.data.data_len = size;
5204 a.body.app_u_u.data.data_val = data;
5205 retval = xcom_send_client_app_data(fd, &a, 0);
5206 my_xdr_free((xdrproc_t) xdr_app_data, (char*)&a);
5207 return retval;
5208 }
5209
socket_read_msg(connection_descriptor * rfd,pax_msg * p)5210 static pax_msg * socket_read_msg(connection_descriptor *rfd, pax_msg *p)
5211 /* Should buffer reads as well */
5212 {
5213 int64_t n;
5214 char *bytes;
5215 unsigned char header_buf[MSG_HDR_SIZE];
5216 xcom_proto x_version;
5217 uint32_t msgsize;
5218 x_msg_type x_type;
5219 unsigned int tag;
5220 int deserialize_ok = 0;
5221
5222 bytes = NULL;
5223
5224 /* Read version, length, type, and tag */
5225 n = socket_read_bytes(rfd, (char*)header_buf, MSG_HDR_SIZE);
5226
5227 if (n <= 0) {
5228 DBGOUT(FN; NDBG(n, ll));
5229 return 0;
5230 }
5231 assert(n == MSG_HDR_SIZE);
5232 x_version = get_32(VERS_PTR(header_buf));
5233 /* Check the protocol version before doing anything else */
5234 #ifdef XCOM_PARANOID
5235 assert(check_protoversion(x_version, rfd->x_proto));
5236 #endif
5237 if (!check_protoversion(x_version, rfd->x_proto)) {
5238 return 0;
5239 }
5240
5241 /* OK, we can grok this version */
5242
5243 get_header_1_0(header_buf, &msgsize, & x_type, &tag);
5244
5245 /* Allocate buffer space for message */
5246 bytes = calloc(1, msgsize);
5247
5248 /* Read message */
5249 n = socket_read_bytes(rfd, bytes, msgsize);
5250
5251 if (n > 0) {
5252 /* Deserialize message */
5253 deserialize_ok = deserialize_msg(p, rfd->x_proto, bytes, msgsize);
5254 MAY_DBG(FN; STRLIT(" deserialized message"));
5255 }
5256 /* Deallocate buffer */
5257 X_FREE(bytes);
5258 if (n <= 0 || deserialize_ok == 0) {
5259 DBGOUT(FN; NDBG(n, ll));
5260 return 0;
5261 }
5262 return(p);
5263 }
5264
xcom_close_client_connection(connection_descriptor * connection)5265 int xcom_close_client_connection(connection_descriptor *connection)
5266 {
5267 int retval = 0;
5268
5269 #ifdef XCOM_HAVE_OPENSSL
5270 if (connection->ssl_fd) {
5271 SSL_shutdown(connection->ssl_fd);
5272 ssl_free_con(connection);
5273 }
5274 #endif
5275 retval = xcom_shut_close_socket(&connection->fd).val;
5276 free(connection);
5277 return retval;
5278 }
5279
xcom_client_boot(connection_descriptor * fd,node_list * nl,uint32_t group_id)5280 int xcom_client_boot(connection_descriptor *fd, node_list *nl, uint32_t group_id)
5281 {
5282 app_data a;
5283 int retval = 0;
5284 retval = (int)xcom_send_client_app_data(fd, init_config_with_group(&a, nl, unified_boot_type, group_id), 0);
5285 my_xdr_free((xdrproc_t) xdr_app_data, (char*)&a);
5286 return retval;
5287 }
5288
xcom_send_app_wait(connection_descriptor * fd,app_data * a,int force)5289 int xcom_send_app_wait(connection_descriptor *fd, app_data *a, int force)
5290 {
5291 int retval = 0;
5292 int retry_count = 10; // Same as 'connection_attempts'
5293 pax_msg p;
5294 pax_msg *rp = 0;
5295
5296 do {
5297 retval = (int)xcom_send_client_app_data(fd, a, force);
5298 if(retval < 0)
5299 return 0;
5300 memset(&p, 0, sizeof(p));
5301 rp = socket_read_msg(fd, &p);
5302 if(rp){
5303 client_reply_code cli_err = rp->cli_err;
5304 my_xdr_free((xdrproc_t)xdr_pax_msg, (char*)&p);
5305 switch(cli_err){
5306 case REQUEST_OK:
5307 return 1;
5308 case REQUEST_FAIL:
5309 G_DEBUG("cli_err %d",cli_err);
5310 return 0;
5311 case REQUEST_RETRY:
5312 G_DEBUG("cli_err %d",cli_err);
5313 xcom_sleep(1);
5314 break;
5315 default:
5316 G_WARNING("client protocol botched");
5317 return 0;
5318 }
5319 }else{
5320 G_WARNING("read failed");
5321 return 0;
5322 }
5323 } while (--retry_count);
5324 // Timeout after REQUEST_RETRY has been received 'retry_count' times
5325 G_MESSAGE(
5326 "Request failed: maximum number of retries (10) has been exhausted.");
5327 return 0;
5328 }
5329
xcom_send_cfg_wait(connection_descriptor * fd,node_list * nl,uint32_t group_id,cargo_type ct,int force)5330 int xcom_send_cfg_wait(connection_descriptor * fd, node_list *nl,
5331 uint32_t group_id, cargo_type ct, int force)
5332 {
5333 app_data a;
5334 int retval = 0;
5335 DBGOUT(FN; COPY_AND_FREE_GOUT(dbg_list(nl)););
5336 retval = xcom_send_app_wait(fd, init_config_with_group(&a, nl, ct, group_id), force);
5337 my_xdr_free((xdrproc_t) xdr_app_data, (char*)&a);
5338 return retval;
5339 }
5340
xcom_client_add_node(connection_descriptor * fd,node_list * nl,uint32_t group_id)5341 int xcom_client_add_node(connection_descriptor *fd, node_list *nl,
5342 uint32_t group_id)
5343 {
5344 return xcom_send_cfg_wait(fd, nl, group_id, add_node_type, 0);
5345 }
5346
xcom_client_remove_node(connection_descriptor * fd,node_list * nl,uint32_t group_id)5347 int xcom_client_remove_node(connection_descriptor *fd, node_list *nl,
5348 uint32_t group_id)
5349 {
5350 return xcom_send_cfg_wait(fd, nl, group_id, remove_node_type, 0);
5351 }
5352
5353 #ifdef NOTDEF
5354 /* Not completely implemented, need to be handled properly
5355 when received as a client message in dispatch_op.
5356 Should have separate opcode from normal add/remove,
5357 like force config_type */
xcom_client_force_add_node(connection_descriptor *,node_list * nl,uint32_t group_id)5358 int xcom_client_force_add_node(connection_descriptor *, node_list *nl,
5359 uint32_t group_id)
5360 {
5361 return xcom_send_cfg_wait(fd, nl, group_id, add_node_type, 1);
5362 }
5363
xcom_client_force_remove_node(connection_descriptor *,node_list * nl,uint32_t group_id)5364 int xcom_client_force_remove_node(connection_descriptor *, node_list *nl,
5365 uint32_t group_id)
5366 {
5367 return xcom_send_cfg_wait(fd, nl, group_id, remove_node_type, 1);
5368 }
5369 #endif
5370
xcom_client_force_config(connection_descriptor * fd,node_list * nl,uint32_t group_id)5371 int xcom_client_force_config(connection_descriptor *fd, node_list *nl,
5372 uint32_t group_id)
5373 {
5374 return xcom_send_cfg_wait(fd, nl, group_id, force_config_type, 1);
5375 }
5376
xcom_client_enable_arbitrator(connection_descriptor * fd)5377 int xcom_client_enable_arbitrator(connection_descriptor *fd)
5378 {
5379 app_data a;
5380 int retval = 0;
5381 init_app_data(&a);
5382 a.body.c_t = enable_arbitrator;
5383 retval = xcom_send_app_wait(fd, &a, 0);
5384 my_xdr_free((xdrproc_t) xdr_app_data, (char*)&a);
5385 return retval;
5386 }
5387
5388
xcom_client_disable_arbitrator(connection_descriptor * fd)5389 int xcom_client_disable_arbitrator(connection_descriptor *fd)
5390 {
5391 app_data a;
5392 int retval = 0;
5393 init_app_data(&a);
5394 a.body.c_t = disable_arbitrator;
5395 retval = xcom_send_app_wait(fd, &a, 0);
5396 my_xdr_free((xdrproc_t) xdr_app_data, (char*)&a);
5397 return retval;
5398 }
5399
xcom_client_terminate_and_exit(connection_descriptor * fd)5400 int xcom_client_terminate_and_exit(connection_descriptor *fd)
5401 {
5402 app_data a;
5403 int retval = 0;
5404 init_app_data(&a);
5405 a.body.c_t = x_terminate_and_exit;
5406 retval = xcom_send_app_wait(fd, &a, 0);
5407 my_xdr_free((xdrproc_t) xdr_app_data, (char*)&a);
5408 return retval;
5409 }
5410
xcom_client_set_cache_limit(connection_descriptor * fd,uint64_t cache_limit)5411 int xcom_client_set_cache_limit(connection_descriptor *fd, uint64_t cache_limit)
5412 {
5413 app_data a;
5414 int retval = 0;
5415 init_app_data(&a);
5416 a.body.c_t = set_cache_limit;
5417 a.body.app_u_u.cache_limit = cache_limit;
5418 retval = xcom_send_app_wait(fd, &a, 0);
5419 my_xdr_free((xdrproc_t) xdr_app_data, (char*)&a);
5420 return retval;
5421 }
5422
5423
5424
5425
5426