1 /* Copyright (c) 2015, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include <rpc/rpc.h>
24 
25 #include <assert.h>
26 #include <errno.h>
27 #include <string.h>
28 #include <stdlib.h>
29 #include <math.h>
30 #include <limits.h>
31 
32 #include "xcom_common.h"
33 #include "x_platform.h"
34 #include "simset.h"
35 #include "xcom_vp.h"
36 #include "task.h"
37 #include "task_os.h"
38 #include "task_debug.h"
39 #include "node_no.h"
40 #include "server_struct.h"
41 #include "xcom_detector.h"
42 #include "site_struct.h"
43 #include "node_connection.h"
44 #include "node_list.h"
45 #include "xcom_transport.h"
46 #include "xcom_statistics.h"
47 #include "xcom_base.h"
48 #include "xcom_vp_str.h"
49 #include "xcom_msg_queue.h"
50 #include "xcom_memory.h"
51 #include "site_def.h"
52 #include "synode_no.h"
53 
54 #ifdef XCOM_HAVE_OPENSSL
55 #include "openssl/ssl.h"
56 #include "openssl/err.h"
57 #endif
58 #include "sock_probe.h"
59 #include "retry.h"
60 #ifdef XCOM_HAVE_OPENSSL
61 #include "xcom_ssl_transport.h"
62 #endif
63 
64 #define MY_XCOM_PROTO x_1_2
65 
66 xcom_proto const my_min_xcom_version = x_1_0; /* The minimum protocol version I am able to understand */
67 xcom_proto const my_xcom_version = MY_XCOM_PROTO; /* The maximun protocol version I am able to understand */
68 
69 /* #define XCOM_ECM */
70 
71 #define SERVER_MAX (2*NSERVERS)
72 
73 /* Turn Nagle's algorithm on or off */
74 static int const NAGLE = 0;
75 
76 extern int	xcom_shutdown;
77 
78 static void shut_srv(server *s);
79 
80 static xcom_port	xcom_listen_port = 0; /* Port used by xcom */
81 
82 static xcom_socket_accept_cb xcom_socket_accept_callback= NULL;
83 
84 /* purecov: begin deadcode */
pm(xcom_port port)85 static int pm(xcom_port port)
86 {
87 	return port == xcom_listen_port;
88 }
89 /* purecov: end */
90 
set_xcom_socket_accept_cb(xcom_socket_accept_cb x)91 int set_xcom_socket_accept_cb(xcom_socket_accept_cb x)
92 {
93   xcom_socket_accept_callback= x;
94   return 1;
95 }
96 
init_xcom_transport(xcom_port listen_port)97 void init_xcom_transport(xcom_port listen_port)
98 {
99     xcom_listen_port = listen_port;
100 	if(get_port_matcher() == 0)
101 /* purecov: begin deadcode */
102 		set_port_matcher(pm);
103 /* purecov: end */
104 }
105 
106 
reset_srv_buf(srv_buf * sb)107 void reset_srv_buf(srv_buf *sb)
108 {
109 	sb->start = 0;
110 	sb->n = 0;
111 }
112 
113 
114 /* Note that channel is alive */
alive(server * s)115 static void alive(server *s)
116 {
117 	if (s) {
118 		s->active = task_now();
119 	}
120 }
121 
122 
123 
srv_buf_capacity(srv_buf * sb)124 static u_int srv_buf_capacity(srv_buf *sb)
125 {
126 	return sizeof(sb->buf);
127 }
128 
srv_buf_free_space(srv_buf * sb)129 static u_int srv_buf_free_space(srv_buf *sb)
130 {
131 	return ((u_int)sizeof(sb->buf)) - sb->n;
132 }
133 
srv_buf_buffered(srv_buf * sb)134 static u_int srv_buf_buffered(srv_buf *sb)
135 {
136 	return sb->n - sb->start;
137 }
138 
srv_buf_extract_ptr(srv_buf * sb)139 static char	*srv_buf_extract_ptr(srv_buf *sb)
140 {
141 	return & sb->buf[sb->start];
142 }
143 
144 
srv_buf_insert_ptr(srv_buf * sb)145 static char	*srv_buf_insert_ptr(srv_buf *sb)
146 {
147 	return & sb->buf[sb->n];
148 }
149 
150 
advance_extract_ptr(srv_buf * sb,u_int len)151 static inline void advance_extract_ptr(srv_buf *sb, u_int len)
152 {
153 	sb->start += len;
154 }
155 
get_srv_buf(srv_buf * sb,char * data,u_int len)156 static u_int get_srv_buf(srv_buf *sb, char *data, u_int len)
157 {
158 	if(len > srv_buf_buffered(sb)){
159 		len = srv_buf_buffered(sb);
160 	}
161 
162 	memcpy(data, srv_buf_extract_ptr(sb), len);
163 	advance_extract_ptr(sb, len);
164 	return len;
165 }
166 
advance_insert_ptr(srv_buf * sb,u_int len)167 static inline void advance_insert_ptr(srv_buf *sb, u_int len)
168 {
169 	sb->n += len;
170 }
171 
put_srv_buf(srv_buf * sb,char * data,u_int len)172 static u_int put_srv_buf(srv_buf *sb, char *data, u_int len)
173 {
174 	assert(sb->n + len <= sizeof(sb->buf));
175 	memcpy(srv_buf_insert_ptr(sb), data, len);
176 	advance_insert_ptr(sb, len);
177 	return len;
178 }
179 
180 
flush_srv_buf(server * s,int64_t * ret)181 int	flush_srv_buf(server *s, int64_t *ret)
182 {
183 	DECL_ENV
184 	    u_int buflen;
185 	END_ENV;
186 
187 	TASK_BEGIN
188 	    ep->buflen = s->out_buf.n;
189 	reset_srv_buf(&s->out_buf);
190 	if (s->con.fd >= 0) {
191 		int64_t	sent = 0;
192 		if (ep->buflen) {
193 			/* DBGOUT(FN; PTREXP(stack); NDBG(ep->buflen, u)); */
194 			/* LOCK_FD(s->con.fd, 'w'); */
195 			TASK_CALL(task_write(&s->con, s->out_buf.buf, ep->buflen, &sent));
196 			/* UNLOCK_FD(s->fd, 'w'); */
197 			if (sent <= 0) {
198 				shutdown_connection(&s->con);
199 			}
200 		}
201 		TASK_RETURN(sent);
202 	} else {
203 		TASK_FAIL;
204 	}
205 
206 	FINALLY
207 	    TASK_END;
208 }
209 
210 /* Send a message to server s */
_send_msg(server * s,pax_msg * p,node_no to,int64_t * ret)211 static int	_send_msg(server *s, pax_msg *p, node_no to, int64_t *ret)
212 {
213 	DECL_ENV
214 	uint32_t	buflen;
215 	char	*buf;
216 	END_ENV;
217 
218 	TASK_BEGIN
219 		p->to = to;
220 	MAY_DBG(FN; PTREXP(stack); PTREXP(s); PTREXP(p); NDBG(s->con.fd, d));
221 	MAY_DBG(FN;
222 			STREXP(s->srv);
223 			NDBG(s->port, d);
224 			NDBG(task_now(), f);
225 			COPY_AND_FREE_GOUT(dbg_pax_msg(p));
226 			);
227 	if (to == p->from) {
228 		MAY_DBG(FN;
229 				COPY_AND_FREE_GOUT(dbg_pax_msg(p)); );
230 		dispatch_op(find_site_def(p->synode), p, NULL);
231 		TASK_RETURN(sizeof(*p));
232 	} else {
233 		if (s->con.fd >= 0) {
234 			int64_t	sent= 0;
235 			/* LOCK_FD(s->con.fd, 'w'); */
236 			serialize_msg(p, s->con.x_proto, &ep->buflen, &ep->buf);
237 			if(ep->buflen){
238 				/* Not enough space? Flush the buffer */
239 				if (ep->buflen > srv_buf_free_space(&s->out_buf)) {
240 					TASK_CALL(flush_srv_buf(s, ret));
241 					if (s->con.fd < 0) {
242 						TASK_FAIL;
243 					}
244 					/* Still not enough? Message must be huge, send without buffering */
245 					if (ep->buflen > srv_buf_free_space(&s->out_buf)) {
246 						DBGOUT(FN; STRLIT("task_write"));
247 						TASK_CALL(task_write(&s->con, ep->buf, ep->buflen, &sent));
248 						if (s->con.fd < 0) {
249 							TASK_FAIL;
250 						}
251 					} else { /* Buffer the write */
252 						put_srv_buf(&s->out_buf, ep->buf, ep->buflen);
253 						sent = ep->buflen;
254 					}
255 				} else { /* Buffer the write */
256 					put_srv_buf(&s->out_buf, ep->buf, ep->buflen);
257 					sent = ep->buflen;
258 				}
259 				send_count[p->op]++;
260 				send_bytes[p->op] += ep->buflen;
261 				alive(s); /* Note activity */
262 				/* DBGOUT(STRLIT("sent message "); STRLIT(pax_op_to_str(p->op)); */
263 				/*        NDBG(p->from,d); NDBG(p->to,d); */
264 				/*        SYCEXP(p->synode);  */
265 				/*        BALCEXP(p->proposal)); */
266 				X_FREE(ep->buf);
267 				/* UNLOCK_FD(s->con.fd, 'w'); */
268 				if (sent <= 0) {
269 					shutdown_connection(&s->con);
270 				}
271 			}
272       TASK_RETURN(sent);
273 		} else
274 			TASK_FAIL;
275 	}
276 	FINALLY
277 	    if (ep->buf)
278 			X_FREE(ep->buf);
279 	TASK_END;
280 }
281 
282 
write_protoversion(unsigned char * buf,xcom_proto proto_vers)283 void write_protoversion(unsigned char *buf, xcom_proto proto_vers)
284 {
285 	put_32(VERS_PTR(buf), proto_vers);
286 }
287 
read_protoversion(unsigned char * p)288 xcom_proto read_protoversion(unsigned char *p)
289 {
290 	return get_32(p);
291 }
292 
check_protoversion(xcom_proto x_proto,xcom_proto negotiated)293 int	check_protoversion(xcom_proto x_proto, xcom_proto negotiated)
294 {
295 	if(x_proto != negotiated){
296 		DBGOUT(FN; STRLIT(" found XCOM protocol version ");
297 			   NDBG(x_proto,d); STRLIT(" need version ");
298 			   NDBG(negotiated,d); );
299 
300 		return 0;
301 	}
302 	return 1;
303 }
304 
305 
306 /* Send a protocol negotiation message on connection con */
send_proto(connection_descriptor * con,xcom_proto x_proto,x_msg_type x_type,unsigned int tag,int64_t * ret)307 int send_proto(connection_descriptor *con, xcom_proto x_proto,
308                x_msg_type x_type, unsigned int tag, int64_t *ret)
309 {
310 	DECL_ENV
311 	char	buf[MSG_HDR_SIZE];
312 	END_ENV;
313 
314 	TASK_BEGIN
315 		if (con->fd >= 0) {
316 			con->snd_tag = tag;
317 			write_protoversion(VERS_PTR((unsigned char*) ep->buf), x_proto);
318 			put_header_1_0((unsigned char*) ep->buf, 0, x_type, tag);
319 
320 			{
321 				int64_t	sent;
322 
323 				TASK_CALL(task_write(con, ep->buf, MSG_HDR_SIZE, &sent));
324 				if (con->fd < 0) {
325 					TASK_FAIL;
326 				}
327 				if (sent <= 0) {
328 					shutdown_connection(con);
329 				}
330 				TASK_RETURN(sent);
331 			}
332 		} else {
333 			TASK_FAIL;
334 		}
335 	FINALLY
336 
337 		TASK_END;
338 }
339 
apply_xdr(xcom_proto x_proto,gpointer buff,uint32_t bufflen,xdrproc_t xdrfunc,void * xdrdata,enum xdr_op op)340 int apply_xdr(xcom_proto x_proto, gpointer buff, uint32_t bufflen,
341 			   xdrproc_t xdrfunc, void *xdrdata,
342 			   enum xdr_op op)
343 {
344 	XDR xdr;
345 	int	MY_ATTRIBUTE ((unused)) s = 0;
346 
347 	xdr.x_ops = NULL;
348 	xdrmem_create(&xdr, buff, bufflen, op);
349 	/*
350 	  Mac OSX changed the xdrproc_t prototype to take
351 	  three parameters instead of two.
352 
353 	  The argument is that it has the potential to break
354 	  the ABI due to compiler optimizations.
355 
356 	  The recommended value for the third parameter is
357 	  0 for those that are not making use of it (which
358 	  is the case). This will keep this code cross-platform
359 	  and cross-version compatible.
360 	*/
361 	if (xdr.x_ops){
362 		xdr.x_public = (caddr_t)&x_proto; /* Supply protocol version in user field of xdr */
363 		s = xdrfunc(&xdr, xdrdata, 0);
364 	}
365 	xdr_destroy(&xdr);
366 	return s;
367 }
368 
369 
370 #if TASK_DBUG_ON
dump_header(char * buf)371 static void dump_header(char *buf)
372 {
373 	char	*end = buf + MSG_HDR_SIZE;
374 	GET_GOUT;
375 	STRLIT("message header ");
376 	PTREXP(buf);
377 	while (buf < end) {
378 		NPUT(*buf, x);
379 		buf++;
380 	}
381   PRINT_GOUT;
382   FREE_GOUT;
383 }
384 #endif
385 
386 void dbg_app_data(app_data_ptr a);
387 
388 #ifdef OLD_XDR
389 #define const
390 #endif
391 
392 /* ARGSUSED */
393 static bool_t
x_putlong(XDR * xdrs,const int * intp MY_ATTRIBUTE ((unused)))394 x_putlong (XDR *xdrs,
395 #if defined(__APPLE__) && defined(__LP64__)
396            const int *intp MY_ATTRIBUTE((unused))
397 #elif defined(X_PUTLONG_NOT_USE_CONST)
398            long *longp MY_ATTRIBUTE((unused))
399 #else
400            const long *longp MY_ATTRIBUTE((unused))
401 #endif
402 )
403 {
404   xdrs->x_handy += BYTES_PER_XDR_UNIT;
405   return TRUE;
406 }
407 
408 /* ARGSUSED */
409 #ifdef OLD_XDR
410 static bool_t
x_putbytes(XDR * xdrs,char * bp MY_ATTRIBUTE ((unused)),int len)411 x_putbytes (XDR *xdrs, char *bp MY_ATTRIBUTE((unused)), int len)
412 {
413   xdrs->x_handy += len;
414   return TRUE;
415 }
416 #else
417 static bool_t
x_putbytes(XDR * xdrs,const char * bp MY_ATTRIBUTE ((unused)),u_int len)418 x_putbytes (XDR *xdrs, const char *bp MY_ATTRIBUTE((unused)), u_int len)
419 {
420   xdrs->x_handy += len;
421   return TRUE;
422 }
423 
424 #endif
425 
426 
427 static u_int
428 #if defined(__APPLE__) || defined(__FreeBSD__) || defined(HAVE_TIRPC)
x_getpostn(XDR * xdrs)429 x_getpostn(XDR *xdrs)
430 #else
431 x_getpostn (const XDR *xdrs)
432 #endif
433 {
434 #ifdef OLD_XDR
435   return (u_int)(xdrs->x_handy);
436 #else
437   return xdrs->x_handy;
438 #endif
439 }
440 
441 /* ARGSUSED */
442 static bool_t
x_setpostn(XDR * xdrs MY_ATTRIBUTE ((unused)),u_int len MY_ATTRIBUTE ((unused)))443 x_setpostn (XDR *xdrs MY_ATTRIBUTE((unused)), u_int len MY_ATTRIBUTE((unused)))
444 {
445   /* This is not allowed */
446   return FALSE;
447 }
448 
449 #ifdef HAVE_RPC_INLINE_T
450 #define INLINE_T rpc_inline_t
451 #else
452 #define INLINE_T int32_t
453 #endif
454 
455 #ifdef OLD_XDR
456 static INLINE_T *
x_inline(XDR * xdrs,int len)457 x_inline (XDR *xdrs, int len)
458 {
459 	if (len == 0)
460 		return NULL;
461 	if (xdrs->x_op != XDR_ENCODE)
462 		return NULL;
463 	if (len < (int) (long int) xdrs->x_base) {
464 		/* x_private was already allocated */
465 		xdrs->x_handy += len;
466 		return (INLINE_T * ) xdrs->x_private;
467 	} else {
468 		/* Free the earlier space and allocate new area */
469 		free (xdrs->x_private);
470 		if ((xdrs->x_private = (caddr_t) malloc ((size_t)len)) == NULL) {
471 			xdrs->x_base = 0;
472 			return NULL;
473 		}
474 		xdrs->x_base = (void * ) (long) len;
475 		xdrs->x_handy += len;
476 		return (INLINE_T * ) xdrs->x_private;
477 	}
478 }
479 #else
480 static INLINE_T *
x_inline(XDR * xdrs,u_int len)481 x_inline (XDR *xdrs, u_int len)
482 {
483 	if (len == 0)
484 		return NULL;
485 	if (xdrs->x_op != XDR_ENCODE)
486 		return NULL;
487 	if (len < (u_int) (long int) xdrs->x_base) {
488 		/* x_private was already allocated */
489 		xdrs->x_handy += len;
490 		return (INLINE_T * ) xdrs->x_private;
491 	} else {
492 		/* Free the earlier space and allocate new area */
493 		free (xdrs->x_private);
494 		if ((xdrs->x_private = (caddr_t) malloc (len)) == NULL) {
495 			xdrs->x_base = 0;
496 			return NULL;
497 		}
498 		xdrs->x_base = (void * ) (long) len;
499 		xdrs->x_handy += len;
500 		return (INLINE_T * ) xdrs->x_private;
501 	}
502 }
503 #endif
504 
505 #undef INLINE_T
506 /* purecov: begin deadcode */
507 static bool_t
508 /* to stop ANSI-C compiler from complaining */
509 #if defined(__APPLE__) && defined(__LP64__)
dummyfunc1(XDR * xdr MY_ATTRIBUTE ((unused)),int * ip MY_ATTRIBUTE ((unused)))510 dummyfunc1 (XDR *xdr MY_ATTRIBUTE((unused)), int *ip MY_ATTRIBUTE((unused)))
511 #else
512 dummyfunc1 (XDR *xdr MY_ATTRIBUTE((unused)), long *lp MY_ATTRIBUTE((unused)))
513 #endif
514 {
515   /* Always return FALSE/NULL, as the case may be */
516   return FALSE;
517 }
518 
519 static bool_t
dummyfunc2(XDR * xdr MY_ATTRIBUTE ((unused)),caddr_t ct MY_ATTRIBUTE ((unused)),int i MY_ATTRIBUTE ((unused)))520 dummyfunc2 (XDR *xdr MY_ATTRIBUTE((unused)),
521 #ifdef OLD_XDR
522             caddr_t ct MY_ATTRIBUTE((unused)),
523             int i MY_ATTRIBUTE((unused)))
524 #else
525             caddr_t ct MY_ATTRIBUTE((unused)),
526             u_int u MY_ATTRIBUTE((unused)))
527 #endif
528 {
529   /* Always return FALSE/NULL, as the case may be */
530   return FALSE;
531 }
532 
533 #ifdef HAVE_XDR_OPS_X_GETINT32
534 static bool_t
dummyfunc3(XDR * xdr MY_ATTRIBUTE ((unused)),int32_t * ip MY_ATTRIBUTE ((unused)))535 dummyfunc3 (XDR *xdr MY_ATTRIBUTE((unused)),
536             int32_t *ip MY_ATTRIBUTE((unused)))
537 {
538   /* Always return FALSE/NULL, as the case may be */
539   return FALSE;
540 }
541 #endif
542 
543 static void
x_destroy(XDR * xdrs)544 x_destroy (XDR *xdrs)
545 {
546   xdrs->x_handy = 0;
547   xdrs->x_base = 0;
548   if (xdrs->x_private)
549     {
550       free (xdrs->x_private);
551       xdrs->x_private = NULL;
552     }
553   return;
554 }
555 /* purecov: end */
556 #ifdef HAVE_XDR_OPS_X_PUTINT32
557 static bool_t
558 #ifdef OLD_XDR
x_putint32(XDR * xdrs,int32_t * int32p MY_ATTRIBUTE ((unused)))559 x_putint32 (XDR *xdrs, int32_t *int32p MY_ATTRIBUTE((unused)))
560 #else
561 x_putint32 (XDR *xdrs, const int32_t *int32p MY_ATTRIBUTE((unused)))
562 #endif
563 {
564   xdrs->x_handy += BYTES_PER_XDR_UNIT;
565   return TRUE;
566 }
567 #endif
568 
569 static uint64_t
xdr_proto_sizeof(xcom_proto x_proto,xdrproc_t func,void * data)570 xdr_proto_sizeof (xcom_proto x_proto, xdrproc_t func, void *data)
571 {
572   XDR x;
573   struct xdr_ops ops;
574   bool_t stat;
575 
576 
577   memset(&ops, 0, sizeof(struct xdr_ops));
578   ops.x_putlong = x_putlong;
579   ops.x_putbytes = x_putbytes;
580   ops.x_inline = x_inline;
581   ops.x_getpostn = x_getpostn;
582   ops.x_setpostn = x_setpostn;
583   ops.x_destroy = x_destroy;
584 
585 #ifdef HAVE_XDR_OPS_X_PUTINT32
586   ops.x_putint32 = x_putint32;
587 #endif
588   /* the other harmless ones */
589   ops.x_getlong = dummyfunc1;
590   ops.x_getbytes = dummyfunc2;
591 #ifdef HAVE_XDR_OPS_X_GETINT32
592   ops.x_getint32 = dummyfunc3;
593 #endif
594   x.x_op = XDR_ENCODE;
595   x.x_ops = &ops;
596   x.x_handy = 0;
597   x.x_private = (caddr_t) NULL;
598   x.x_base = (caddr_t) 0;
599   x.x_public = (caddr_t)&x_proto;
600 
601   /*
602     Mac OSX changed the xdrproc_t prototype to take
603     three parameters instead of two.
604 
605     The argument is that it has the potential to break
606     the ABI due to compiler optimizations.
607 
608     The recommended value for the third parameter is
609     0 for those that are not making use of it (which
610     is the case). This will keep this code cross-platform
611     and cross-version compatible.
612   */
613   stat = func (&x, data, 0);
614   free (x.x_private);
615   // x_handy is int type for old XDR
616   return stat == TRUE ? (uint64_t)x.x_handy : 0;
617 }
618 
619 #ifdef OLD_XDR
620 #undef const
621 #endif
622 
623 /* Return 0 if it fails to serialize the message, otherwise 1 is returned. */
serialize(void * p,xcom_proto x_proto,uint32_t * out_len,xdrproc_t xdrfunc,char ** out_buf)624 static int serialize(void *p, xcom_proto x_proto, uint32_t *out_len,
625                      xdrproc_t xdrfunc, char **out_buf)
626 {
627 	unsigned char	*buf = NULL;
628 	uint64_t msg_buflen = 0;
629 	uint64_t tot_buflen = 0;
630 	unsigned int	tag = 0;
631 	x_msg_type x_type = x_normal;
632 	int retval = 0;
633 
634 	/* Find length of serialized message */
635 	msg_buflen = xdr_proto_sizeof(x_proto, xdrfunc, p);
636 	tot_buflen = SERIALIZED_BUFLEN(msg_buflen);
637 	MAY_DBG(FN; NDBG(msg_buflen, llu); NDBG(tot_buflen, llu));
638   /*
639     Paxos message size is limited in UINT32 range. It will return an
640     error if the serialized message is bigger than UINT32_MAX bytes.
641   */
642 	if (tot_buflen > UINT32_MAX)
643 	{
644 		G_ERROR("Serialized message exceeds 4GB limit.");
645 		return retval;
646 	}
647 
648 	/*
649 	  Allocate space for version number, length field, type, tag, and serialized
650 	  message. Explicit type case suppress the warnings on 32bits.
651 	*/
652 	buf = calloc(1, (size_t)tot_buflen);
653 	if (buf) {
654 		/* Write protocol version */
655 		write_protoversion(buf, x_proto);
656 
657 		/* Serialize message */
658 		retval = apply_xdr(x_proto, MSG_PTR(buf), (uint32_t)msg_buflen,
659 						   xdrfunc,
660 						   p, XDR_ENCODE);
661 		if(retval){
662 			/* Serialize header into buf */
663 			put_header_1_0(buf, (uint32_t)msg_buflen, x_type, tag);
664 		}
665 
666 		*out_len = (uint32_t)tot_buflen;
667 		*out_buf = (char * )buf;
668 	}
669 	MAY_DBG(FN; NDBG(*out_len, u); PTREXP(*out_buf);
670 			dump_header(*out_buf));
671 	return retval;
672 }
673 
674 
675 /* Version 1 has no new messages, only modified, so all should be sent */
old_proto_knows(xcom_proto x_proto MY_ATTRIBUTE ((unused)),pax_op op MY_ATTRIBUTE ((unused)))676 static inline int old_proto_knows(xcom_proto x_proto MY_ATTRIBUTE((unused)),
677                                   pax_op op MY_ATTRIBUTE((unused)))
678 {
679 	return 1;
680 }
681 
serialize_msg(pax_msg * p,xcom_proto x_proto,uint32_t * buflen,char ** buf)682 int serialize_msg(pax_msg *p, xcom_proto x_proto, uint32_t *buflen, char **buf)
683 {
684 	*buflen = 0;
685 	*buf = 0;
686 
687 	return old_proto_knows(x_proto, p->op) &&
688 						serialize((void *)p, x_proto, buflen, (xdrproc_t)xdr_pax_msg, buf);
689 }
690 
deserialize_msg(pax_msg * p,xcom_proto x_proto,char * buf,uint32_t buflen)691 int deserialize_msg(pax_msg *p, xcom_proto x_proto,  char *buf, uint32_t buflen)
692 {
693 	int apply_ok = apply_xdr(x_proto, buf, buflen,
694 						   (xdrproc_t)xdr_pax_msg,
695 						   (void * )p, XDR_DECODE);
696 	if(!apply_ok){
697 		my_xdr_free((xdrproc_t)xdr_pax_msg,
698 					(char * )p);
699 	}
700 	return apply_ok;
701 }
702 
703 /* Better checksum */
704 static uint32_t crc_table[256];
705 
init_crc32c()706 void init_crc32c()
707 {
708 	uint32_t i;
709 	for (i = 0; i < 256; i++) {
710 		int	j;
711 		uint32_t c = i;
712 		for (j = 0; j < 8; j++) {
713 			c = (c & 1) ? (0x82F63B78 ^ (c >> 1)) : (c >> 1);
714 		}
715 		crc_table[i] = c;
716 	}
717 }
718 
719 
720 #define CRC32CSTART 0xFFFFFFFF
721 
722 /* purecov: begin deadcode */
crc32c_hash(char * buf,char * end)723 uint32_t crc32c_hash(char *buf, char *end)
724 {
725 	uint32_t c = CRC32CSTART;
726 	unsigned char	*p = (unsigned char*)buf;
727 	unsigned char	*e = (unsigned char*)end;
728 	for (; p < e; p++) {
729 		c = crc_table[(c ^ (*p)) & 0xFF] ^ (c >> 8);
730 	}
731 	return c ^ 0xFFFFFFFF;
732 }
733 /* purecov: end */
734 
735 /* {{{ Paxos servers (nodes) */
736 
737 /* Array of servers, only maxservers entries actually used */
738 static server *all_servers[SERVER_MAX];
739 static int	maxservers = 0;
740 
741 /* Create a new server */
742 static server *
mksrv(char * srv,xcom_port port)743 mksrv(char *srv, xcom_port port)
744 {
745 	server * s;
746 
747 	s = calloc(1, sizeof (* s));
748 
749 	DBGOUT(FN; PTREXP(s); STREXP(srv));
750 	if (s == 0) {
751 		g_critical("out of memory");
752 		abort();
753 	}
754 	s->garbage = 0;
755 	s->invalid = 0;
756 	s->refcnt = 0;
757 	s->srv = srv;
758 	s->port = port;
759 	reset_connection(&s->con);
760 	s->active = 0.0;
761 	s->detected = 0.0;
762 	channel_init(&s->outgoing, type_hash("msg_link"));
763 	DBGOUT(FN; STREXP(srv); NDBG(port,d));
764 	if (xcom_mynode_match(srv, port)) { /* Short-circuit local messages */
765 		DBGOUT(FN; STRLIT("creating local sender"); STREXP(srv); NDBG(port,d));
766 		s->sender = task_new(local_sender_task, void_arg(s), "local_sender_task", XCOM_THREAD_DEBUG);
767 	}else{
768 		s->sender = task_new(sender_task, void_arg(s), "sender_task", XCOM_THREAD_DEBUG);
769 		DBGOUT(FN; STRLIT("creating sender and reply_handler"); STREXP(srv); NDBG(port,d));
770 		s->reply_handler = task_new(reply_handler_task, void_arg(s), "reply_handler_task", XCOM_THREAD_DEBUG);
771 	}
772 	reset_srv_buf(&s->out_buf);
773 	/*
774 	 Keep the server from being freed if the acceptor_learner_task calls
775 	 srv_unref on the server before the {local_,}server_task and
776 	 reply_handler_task begin.
777 	*/
778 	srv_ref(s);
779 	return s;
780 }
781 
782 
addsrv(char * srv,xcom_port port)783 static server *addsrv(char *srv, xcom_port port)
784 {
785 	server * s = mksrv(srv, port);
786 	assert(all_servers[maxservers] == 0);
787 	assert(maxservers < SERVER_MAX);
788 	all_servers[maxservers] = s;
789 	MAY_DBG(FN; PTREXP(all_servers[maxservers]); STREXP(all_servers[maxservers]->srv); NDBG(all_servers[maxservers]->port, d); NDBG(maxservers, d));
790 	maxservers++;
791 	return s;
792 }
793 
794 
rmsrv(int i)795 static void rmsrv(int i)
796 {
797 	assert(all_servers[i]);
798 	assert(maxservers > 0);
799 	assert(i < maxservers);
800 	MAY_DBG(FN; PTREXP(all_servers[i]); STREXP(all_servers[i]->srv); NDBG(all_servers[i]->port, d); NDBG(i, d));
801 	maxservers--;
802 	all_servers[i] = all_servers[maxservers];
803 	all_servers[maxservers] = 0;
804 }
805 
806 
init_collect()807 static void	init_collect()
808 {
809 	int	i;
810 
811 	for (i = 0; i < maxservers; i++) {
812 		assert(all_servers[i]);
813 		all_servers[i]->garbage = 1;
814 	}
815 }
816 
817 
818 extern void	get_all_site_defs(site_def ***s, uint32_t *n);
819 
mark_site_servers(site_def * site)820 static void mark_site_servers(site_def *site)
821 {
822 	u_int i;
823 	for (i = 0; i < get_maxnodes(site); i++) {
824 		server * s = site->servers[i];
825 		assert(s);
826 		s->garbage = 0;
827 	}
828 }
829 
830 
mark()831 static void	mark()
832 {
833 	site_def * *site;
834 	uint32_t	n;
835 	uint32_t	i;
836 
837 	get_all_site_defs(&site, &n);
838 
839 	for (i = 0; i < n; i++) {
840 		if (site[i]) {
841 			mark_site_servers(site[i]);
842 		}
843 	}
844 }
845 
846 
sweep()847 static void	sweep()
848 {
849 	int	i = 0;
850 	while (i < maxservers) {
851 		server *s = all_servers[i];
852 		assert(s);
853 		if (s->garbage) {
854 			DBGOUT(FN; STREXP(s->srv));
855 			shut_srv(s);
856 			rmsrv(i);
857 		} else {
858 			i++;
859 		}
860 	}
861 }
862 
863 
garbage_collect_servers()864 void garbage_collect_servers()
865 {
866 	DBGOUT(FN);
867 	init_collect();
868 	mark();
869 	sweep();
870 }
871 
872 
873 /* Free a server */
freesrv(server * s)874 static void freesrv(server *s)
875 {
876 	X_FREE(s->srv);
877 	X_FREE(s);
878 }
879 
880 
server_active(site_def const * s,node_no i)881 double	server_active(site_def const *s, node_no i)
882 {
883 	if (s->servers[i])
884 		return s->servers[i]->active;
885 	else
886 		return 0.0;
887 }
888 
889 
890 /* Shutdown server */
shut_srv(server * s)891 static void shut_srv(server *s)
892 {
893 	if (!s)
894 		return;
895 	DBGOUT(FN; PTREXP(s); STREXP(s->srv));
896 
897 	shutdown_connection(&s->con);
898 
899 	/* Tasks will free the server object when they terminate */
900 	if (s->sender)
901 		task_terminate(s->sender);
902 	if (s->reply_handler)
903 		task_terminate(s->reply_handler);
904 
905 	// Allow the server to be freed. This unref pairs with the ref from mksrv.
906 	srv_unref(s);
907 }
908 
909 
srv_ref(server * s)910 int	srv_ref(server *s)
911 {
912 	assert(s->refcnt >= 0);
913 	s->refcnt++;
914 	return s->refcnt;
915 }
916 
917 
srv_unref(server * s)918 int	srv_unref(server *s)
919 {
920 	assert(s->refcnt >= 0);
921 	s->refcnt--;
922 	if (s->refcnt == 0) {
923 		freesrv(s);
924 		return 0;
925 	}
926 	return s->refcnt;
927 }
928 
929 
930 /* }}} */
931 
932 /* Listen for connections on socket and create a handler task */
tcp_server(task_arg arg)933 int tcp_server(task_arg arg) {
934   DECL_ENV
935   int fd;
936   int cfd;
937   int refused;
938   END_ENV;
939   TASK_BEGIN
940   ep->fd = get_int_arg(arg);
941   ep->refused = 0;
942   unblock_fd(ep->fd);
943   DBGOUT(FN; NDBG(ep->fd, d););
944   G_MESSAGE("XCom protocol version: %d", my_xcom_version);
945   G_MESSAGE(
946       "XCom initialized and ready to accept incoming connections on port %d",
947       xcom_listen_port);
948   do {
949     TASK_CALL(accept_tcp(ep->fd, &ep->cfd));
950     /* Callback to check that the file descriptor is accepted. */
951     if (xcom_socket_accept_callback &&
952         !xcom_socket_accept_callback(ep->cfd, get_site_def())) {
953       shut_close_socket(&ep->cfd);
954       ep->cfd = -1;
955     }
956     if(ep->cfd == -1){
957       G_DEBUG("accept failed");
958       ep->refused = 1;
959       TASK_DELAY(0.1);
960     } else {
961       ep->refused = 0;
962       DBGOUT(FN; NDBG(ep->cfd, d););
963       task_new(acceptor_learner_task, int_arg(ep->cfd), "acceptor_learner_task",
964                XCOM_THREAD_DEBUG);
965     }
966   } while (!xcom_shutdown && (ep->cfd >= 0 || ep->refused));
967   FINALLY
968   assert(ep->fd >= 0);
969   shut_close_socket(&ep->fd);
970   TASK_END;
971 }
972 
973 #ifdef XCOM_HAVE_OPENSSL
974 #define SSL_CONNECT(con, hostname) {									\
975 		con.ssl_fd = SSL_new(client_ctx);								\
976 		SSL_set_fd(con.ssl_fd, con.fd);									\
977 		ERR_clear_error();												\
978 		ret.val = SSL_connect(con.ssl_fd);								\
979 		ret.funerr = to_ssl_err(SSL_get_error(con.ssl_fd, ret.val));	\
980 		while (ret.val != SSL_SUCCESS && can_retry(ret.funerr)) {		\
981 			if (from_ssl_err(ret.funerr) == SSL_ERROR_WANT_READ){		\
982 				wait_io(stack, con.fd, 'r');							\
983 			}else if (from_ssl_err(ret.funerr) == SSL_ERROR_WANT_WRITE){ \
984 				wait_io(stack, con.fd, 'w');							\
985 			}else{														\
986 				break;													\
987 			}															\
988 			TASK_YIELD;													\
989 			SET_OS_ERR(0);												\
990 			if (con.fd < 0) {											\
991 				ssl_free_con(&con);										\
992 				close_connection(&con);									\
993 				TERMINATE;												\
994 			}															\
995 																		\
996 			ERR_clear_error();											\
997 			ret.val = SSL_connect(con.ssl_fd);							\
998 			ret.funerr = to_ssl_err(SSL_get_error(con.ssl_fd, ret.val)); \
999 		}																\
1000             															\
1001 		if (ret.val != SSL_SUCCESS) {									\
1002 			ssl_free_con(&con);											\
1003 			close_connection(&con);										\
1004 			TERMINATE;													\
1005 		}else{															\
1006 			if (ssl_verify_server_cert(con.ssl_fd, hostname))			\
1007 			{															\
1008 				ssl_free_con(&con);										\
1009 				close_connection(&con);									\
1010 				TERMINATE;												\
1011 			}															\
1012 			set_connected(&con, CON_FD);								\
1013 		}																\
1014 	}
1015 #endif
1016 
server_detected(server * s)1017 void server_detected(server *s) { s->detected = task_now(); }
1018 
1019 /* Try to connect to another node */
dial(server * s)1020 static int	dial(server *s)
1021 {
1022 	DECL_ENV
1023 	    int	dummy;
1024 	END_ENV;
1025 
1026 	TASK_BEGIN
1027 	    DBGOUT(FN; STRLIT(" dial "); NPUT(get_nodeno(get_site_def()), u);
1028              STRLIT(s->srv); NDBG(s->port, u));
1029 	TASK_CALL(connect_tcp(s->srv, s->port, &s->con.fd));
1030 	/* DBGOUT(FN; NDBG(s->con.fd,d);); */
1031 	if (s->con.fd < 0) {
1032 		DBGOUT(FN; STRLIT("could not dial "); STRLIT(s->srv); NDBG(s->port, u); );
1033 	} else {
1034 		if (NAGLE == 0) {
1035 			set_nodelay(s->con.fd);
1036 		}
1037 
1038 		unblock_fd(s->con.fd);
1039 #ifdef XCOM_HAVE_OPENSSL
1040 		if (xcom_use_ssl()) {
1041 			result ret = {0,0};
1042 			SSL_CONNECT(s->con, s->srv);
1043 		}
1044 #endif
1045 		DBGOUT(FN; STRLIT("connected to "); STRLIT(s->srv); NDBG(s->con.fd, d);
1046            NDBG(s->port, u));
1047 		set_connected(&s->con, CON_FD);
1048 		alive(s);
1049 	}
1050 	FINALLY
1051 	    TASK_END;
1052 }
1053 
1054 
1055 /* Send message by putting it in the server queue */
send_msg(server * s,node_no from,node_no to,uint32_t group_id,pax_msg * p)1056 int	send_msg(server *s, node_no from, node_no to, uint32_t group_id, pax_msg *p)
1057 {
1058 	assert(p);
1059 	assert(s);
1060 	 {
1061 		msg_link * link = msg_link_new(p, to);
1062 		alive(s); /* Note activity */
1063 		MAY_DBG(FN; PTREXP(&s->outgoing);
1064 		    COPY_AND_FREE_GOUT(dbg_msg_link(link));
1065 		    );
1066 		p->from = from;
1067 		p->to = to;
1068 		p->group_id = group_id;
1069 		p->max_synode = get_max_synode();
1070 		p->delivered_msg = get_delivered_msg();
1071 		MAY_DBG(FN; PTREXP(p); STREXP(s->srv); NDBG(p->from, d); NDBG(p->to, d); NDBG(p->group_id, u));
1072 		channel_put(&s->outgoing, &link->l);
1073 	}
1074 	return 0;
1075 }
1076 
1077 
_send_server_msg(site_def const * s,node_no to,pax_msg * p)1078 static inline int	_send_server_msg(site_def const *s, node_no to, pax_msg *p)
1079 {
1080 	assert(s);
1081 	assert(s->servers[to]);
1082 	if (s->servers[to] && s->servers[to]->invalid == 0 && p) {
1083 		send_msg(s->servers[to], s->nodeno, to, get_group_id(s), p);
1084 	}
1085 	return 0;
1086 }
1087 
1088 
send_server_msg(site_def const * s,node_no to,pax_msg * p)1089 int send_server_msg(site_def const *s, node_no to, pax_msg *p)
1090 {
1091 	return _send_server_msg(s, to, p);
1092 }
1093 
send_loop(site_def const * s,node_no max,pax_msg * p,const char * dbg MY_ATTRIBUTE ((unused)))1094 static inline int send_loop(site_def const *s, node_no max, pax_msg *p, const char *dbg MY_ATTRIBUTE((unused)))
1095 {
1096 	int	retval = 0;
1097 	assert(s);
1098 	if (s) {
1099 		node_no i = 0;
1100 		for (i = 0; i < max; i++) {
1101 			MAY_DBG(FN; STRLIT(dbg); STRLIT(" "); NDBG(i, u); NDBG(max, u); PTREXP(p));
1102 			retval = _send_server_msg(s, i, p);
1103 		}
1104 	}
1105 	return retval;
1106 }
1107 
1108 
1109 /* Send to all servers in site */
send_to_all_site(site_def const * s,pax_msg * p,const char * dbg)1110 int	send_to_all_site(site_def const *s, pax_msg *p, const char *dbg)
1111 {
1112 	int	retval = 0;
1113 	retval = send_loop(s, get_maxnodes(s), p, dbg);
1114 	return retval;
1115 }
1116 
1117 /* Send to all servers */
send_to_all(pax_msg * p,const char * dbg)1118 int	send_to_all(pax_msg *p, const char *dbg)
1119 {
1120 	return send_to_all_site(find_site_def(p->synode), p, dbg);
1121 }
1122 
1123 
send_other_loop(site_def const * s,pax_msg * p,const char * dbg MY_ATTRIBUTE ((unused)))1124 static inline int send_other_loop(site_def const *s, pax_msg *p, const char *dbg MY_ATTRIBUTE((unused)))
1125 {
1126 	int	retval = 0;
1127 	node_no i = 0;
1128 #ifdef MAXACCEPT
1129 	node_no max = MIN(get_maxnodes(s), MAXACCEPT);
1130 #else
1131 	node_no max;
1132 	assert(s);
1133 	max = get_maxnodes(s);
1134 #endif
1135 	for (i = 0; i < max; i++) {
1136 		if (i != s->nodeno) {
1137 			MAY_DBG(FN; STRLIT(dbg); STRLIT(" "); NDBG(i, u); NDBG(max, u); PTREXP(p));
1138 			retval = _send_server_msg(s, i, p);
1139 		}
1140 	}
1141 	return retval;
1142 }
1143 
1144 
1145 /* Send to other servers */
send_to_others(site_def const * s,pax_msg * p,const char * dbg)1146 int	send_to_others(site_def const *s, pax_msg *p, const char *dbg)
1147 {
1148 	int	retval = 0;
1149 	retval = send_other_loop(s, p, dbg);
1150 	return retval;
1151 }
1152 
1153 /* Send to some other live server, round robin */
send_to_someone(site_def const * s,pax_msg * p,const char * dbg MY_ATTRIBUTE ((unused)))1154 int	send_to_someone(site_def const *s, pax_msg *p, const char *dbg MY_ATTRIBUTE((unused)))
1155 {
1156 	int	retval = 0;
1157 	static node_no i = 0;
1158 	node_no prev = 0;
1159 #ifdef MAXACCEPT
1160 	node_no max = MIN(get_maxnodes(s), MAXACCEPT);
1161 #else
1162 	node_no max;
1163 	assert(s);
1164 	max = get_maxnodes(s);
1165 #endif
1166 	/* DBGOUT(FN; NDBG(max,u); NDBG(s->maxnodes,u)); */
1167 	assert(max > 0);
1168 	prev = i % max;
1169 	i = (i + 1) % max;
1170 	while (i != prev) {
1171 		/* DBGOUT(FN; NDBG(i,u); NDBG(prev,u)); */
1172 		if (i != s->nodeno && !may_be_dead(s->detected, i, task_now())) {
1173 			MAY_DBG(FN; STRLIT(dbg); NDBG(i, u); NDBG(max, u); PTREXP(p));
1174 			retval = _send_server_msg(s, i, p);
1175 			break;
1176 		}
1177 		i = (i + 1) % max;
1178 	}
1179 	return retval;
1180 }
1181 
1182 
1183 #ifdef MAXACCEPT
1184 /* Send to all acceptors */
send_to_acceptors(pax_msg * p,const char * dbg)1185 int	send_to_acceptors(pax_msg *p, const char *dbg)
1186 {
1187 	site_def const *s = find_site_def(p->synode);
1188 	int	retval = 0;
1189 	int	i;
1190 	retval = send_loop(s, MIN(MAXACCEPT, s->maxnodes), p, dbg);
1191 	return retval;
1192 }
1193 
1194 
1195 #else
1196 /* Send to all acceptors */
send_to_acceptors(pax_msg * p,const char * dbg)1197 int	send_to_acceptors(pax_msg *p, const char *dbg)
1198 {
1199 	return send_to_all(p, dbg);
1200 }
1201 
1202 
1203 #endif
1204 
1205 /* Used by :/int.*read_msg */
1206 /**
1207   Reads n bytes from connection rfd without buffering reads.
1208 
1209   @param[in]     rfd Pointer to open connection.
1210   @param[out]    p   Output buffer.
1211   @param[in]     n   Number of bytes to read.
1212   @param[out]    s   Pointer to server.
1213   @param[out]    ret Number of bytes read, or -1 if failure.
1214 
1215   @return
1216     @retval 0 if task should terminate.
1217     @retval 1 if it should continue.
1218 */
read_bytes(connection_descriptor const * rfd,char * p,uint32_t n,server * s,int64_t * ret)1219 static int	read_bytes(connection_descriptor const * rfd, char *p, uint32_t n,
1220                        server *s, int64_t *ret)
1221 {
1222 	DECL_ENV
1223 	    uint32_t left;
1224 	char	*bytes;
1225 	END_ENV;
1226 
1227 	    int64_t	nread = 0;
1228 
1229 	TASK_BEGIN
1230 
1231 	    ep->left = n;
1232 	ep->bytes = (char *)p;
1233 	while (ep->left > 0) {
1234 		MAY_DBG(FN; NDBG(rfd->fd, d); NDBG(nread, ll); NDBG(ep->left, u));
1235 		TASK_CALL(task_read(rfd, ep->bytes,
1236                         ep->left >= INT_MAX ? INT_MAX : (int)ep->left,
1237                         &nread));
1238 		MAY_DBG(FN; NDBG(rfd->fd, d); NDBG(nread, ll); NDBG(ep->left, u));
1239 		if (nread == 0) {
1240 			TASK_RETURN(0);
1241 		} else if (nread < 0) {
1242 			DBGOUT(FN; NDBG(nread, ll));
1243 			TASK_FAIL;
1244 		} else {
1245 			ep->bytes += nread;
1246 			ep->left -= (uint32_t)nread;
1247 			if (s) server_detected(s);
1248 		}
1249 	}
1250 	assert(ep->left == 0);
1251 	TASK_RETURN(n);
1252 	FINALLY
1253 	    TASK_END;
1254 }
1255 
1256 /**
1257   Reads n bytes from connection rfd with buffering reads.
1258 
1259   @param[in]     rfd Pointer to open connection.
1260   @param[in,out] buf Used for buffering reads.
1261                      Originally initialized by caller, maintained by buffered_read_bytes.
1262   @param[out]    p   Output buffer.
1263   @param[in]     n   Number of bytes to read
1264   @param[out]    s   Pointer to server.
1265   @param[out]    ret Number of bytes read, or -1 if failure.
1266 
1267   @return
1268     @retval 0 if task should terminate.
1269     @retval 1 if it should continue.
1270 */
buffered_read_bytes(connection_descriptor const * rfd,srv_buf * buf,char * p,uint32_t n,server * s,int64_t * ret)1271 static int	buffered_read_bytes(connection_descriptor const * rfd, srv_buf *buf, char *p, uint32_t n, server *s, int64_t *ret)
1272 {
1273 	DECL_ENV
1274 	    uint32_t	left;
1275 	char	*bytes;
1276 	END_ENV;
1277 	uint32_t nget = 0;
1278 
1279 	TASK_BEGIN
1280 	ep->left = n;
1281 	ep->bytes = (char *)p;
1282 
1283 	/* First, try to get bytes from buffer */
1284 	nget = get_srv_buf(buf, ep->bytes, n);
1285 	ep->bytes += nget;
1286 	ep->left -= nget;
1287 
1288 	if(ep->left >= srv_buf_capacity(buf)){
1289 		/* Too big, do direct read of rest */
1290 		TASK_CALL(read_bytes(rfd, ep->bytes, ep->left, s, ret));
1291 		if(*ret <= 0){
1292 			TASK_FAIL;
1293 		}
1294 		ep->left -= (uint32_t)(*ret);
1295 	}else{
1296 		/* Buffered read makes sense */
1297 		while(ep->left > 0){
1298 			int64_t	nread = 0;
1299 			/* Buffer is empty, reset and read */
1300 			reset_srv_buf(buf);
1301 			MAY_DBG(FN; NDBG(rfd->fd, d); NDBG(nread, d););
1302 
1303 			TASK_CALL(task_read(rfd, srv_buf_insert_ptr(buf),
1304                           (int)srv_buf_free_space(buf), &nread));
1305 			MAY_DBG(FN; NDBG(rfd->fd, d); NDBG(nread, d););
1306 			if (nread == 0) {
1307 				TASK_RETURN(0);
1308 			} else if (nread < 0) {
1309 				DBGOUT(FN; NDBG(nread, d));
1310 				TASK_FAIL;
1311 			}else{
1312 				/* Update buffer to reflect number of bytes read */
1313 				advance_insert_ptr(buf, (uint)nread);
1314 				nget = get_srv_buf(buf, ep->bytes, ep->left);
1315 				ep->bytes += nget;
1316 				ep->left -= nget;
1317 				if (s) server_detected(s);
1318 			}
1319 		}
1320 	}
1321 	assert(ep->left == 0);
1322 	TASK_RETURN(n);
1323 	FINALLY
1324 	    TASK_END;
1325 }
1326 
get_header_1_0(unsigned char header_buf[],uint32_t * msgsize,x_msg_type * x_type,unsigned int * tag)1327 void get_header_1_0(unsigned char header_buf[], uint32_t *msgsize,
1328                     x_msg_type *x_type, unsigned int *tag)
1329 {
1330 	*msgsize = get_32(LENGTH_PTR(header_buf));
1331 	*x_type = header_buf[X_TYPE];
1332 	*tag = get_16(X_TAG_PTR(header_buf));
1333 }
1334 
put_header_1_0(unsigned char header_buf[],uint32_t msgsize,x_msg_type x_type,unsigned int tag)1335 void put_header_1_0(unsigned char header_buf[], uint32_t msgsize,
1336                     x_msg_type x_type, unsigned int tag)
1337 {
1338 	put_32(LENGTH_PTR(header_buf), msgsize);
1339 	header_buf[X_TYPE] = x_type;
1340 	put_16(X_TAG_PTR(header_buf), tag);
1341 }
1342 
1343 /* See also :/static .*read_bytes */
read_msg(connection_descriptor * rfd,pax_msg * p,server * s,int64_t * ret)1344 int read_msg(connection_descriptor *rfd, pax_msg *p, server *s, int64_t *ret)
1345 {
1346 	int deserialize_ok = 0;
1347 
1348 	DECL_ENV
1349 	    int64_t	n;
1350 	char	*bytes;
1351 	unsigned char	header_buf[MSG_HDR_SIZE];
1352 	xcom_proto x_version;
1353 	uint32_t	msgsize;
1354 	x_msg_type x_type;
1355 	unsigned int tag;
1356 	END_ENV;
1357 
1358 	TASK_BEGIN
1359 	do{
1360 		ep->bytes = NULL;
1361 		/* Read length field, protocol version, and checksum */
1362 		ep->n = 0;
1363 		TASK_CALL(read_bytes(rfd, (char*)ep->header_buf, MSG_HDR_SIZE, s, &ep->n));
1364 
1365 		if (ep->n != MSG_HDR_SIZE) {
1366 			DBGOUT(FN; NDBG(ep->n, u));
1367 			TASK_FAIL;
1368 		}
1369 
1370 		/* Check the protocol version before doing anything else */
1371 		ep->x_version = read_protoversion(VERS_PTR(ep->header_buf));
1372 		get_header_1_0(ep->header_buf, &ep->msgsize, & ep->x_type, &ep->tag);
1373 		if(ep->x_type == x_version_req){
1374 			/* Negotiation request. See what we can offer */
1375 			rfd->x_proto = negotiate_protocol(ep->x_version);
1376 			DBGOUT(STRLIT("incoming connection will use protcol version ");
1377 			   NDBG(rfd->x_proto,u); STRLIT(xcom_proto_to_str(rfd->x_proto)));
1378 			ADD_EVENTS(
1379 				add_event(string_arg("incoming connection will use protcol version"));
1380 				add_event(string_arg(xcom_proto_to_str(rfd->x_proto)));
1381 			);
1382 			if(rfd->x_proto > my_xcom_version)
1383 				TASK_FAIL;
1384 			set_connected(rfd, CON_PROTO);
1385 			TASK_CALL(send_proto(rfd, rfd->x_proto,  x_version_reply, ep->tag, ret));
1386 		} else if (ep->x_type == x_version_reply){
1387 			/* Mark connection with negotiated protocol version */
1388 			if(rfd->snd_tag == ep->tag){
1389 				rfd->x_proto = ep->x_version;
1390 				DBGOUT(STRLIT("peer connection will use protcol version ");
1391 					   NDBG(rfd->x_proto,u); STRLIT(xcom_proto_to_str(rfd->x_proto)));
1392 
1393 				ADD_EVENTS(
1394 					add_event(string_arg("peer connection will use protcol version"));
1395 					add_event(string_arg( xcom_proto_to_str(rfd->x_proto)));
1396 				);
1397 				if(rfd->x_proto > my_xcom_version || rfd->x_proto == x_unknown_proto)
1398 					TASK_FAIL;
1399 
1400 				set_connected(rfd, CON_PROTO);
1401 			}
1402 		}
1403 	}while(ep->x_type != x_normal);
1404 
1405 #ifdef XCOM_PARANOID
1406 	assert(check_protoversion(ep->x_version, rfd->x_proto));
1407 #endif
1408 	if (!check_protoversion(ep->x_version, rfd->x_proto)) {
1409 		TASK_FAIL;
1410 	}
1411 
1412 	/* OK, we can grok this version */
1413 
1414 	/* Allocate buffer space for message */
1415 	ep->bytes = calloc(1, ep->msgsize);
1416 	if(!ep->bytes){
1417 		TASK_FAIL;
1418 	}
1419 
1420 	/* Read message */
1421 	ep->n = 0;
1422 	TASK_CALL(read_bytes(rfd, ep->bytes, ep->msgsize, s, &ep->n));
1423 
1424 	if (ep->n > 0) {
1425 		/* Deserialize message */
1426 		deserialize_ok = deserialize_msg(p, rfd->x_proto, ep->bytes, ep->msgsize);
1427 		MAY_DBG(FN; STRLIT(" deserialized message"));
1428 
1429 	}
1430 	/* Deallocate buffer */
1431 	X_FREE(ep->bytes);
1432 	if (ep->n <= 0 || !deserialize_ok) {
1433 		DBGOUT(FN; NDBG(ep->n, d); NDBG(deserialize_ok,d));
1434 		TASK_FAIL;
1435 	}
1436 	TASK_RETURN(ep->n);
1437 	FINALLY
1438 		TASK_END;
1439 }
1440 
buffered_read_msg(connection_descriptor * rfd,srv_buf * buf,pax_msg * p,server * s,int64_t * ret)1441 int buffered_read_msg(connection_descriptor *rfd, srv_buf *buf,
1442                       pax_msg *p, server *s, int64_t *ret)
1443 {
1444 	int deserialize_ok = 0;
1445 
1446 	DECL_ENV
1447 	    int64_t	n;
1448 	char	*bytes;
1449 	unsigned char	header_buf[MSG_HDR_SIZE];
1450 	xcom_proto x_version;
1451 	uint32_t	msgsize;
1452 	x_msg_type x_type;
1453 	unsigned int tag;
1454 #ifdef NOTDEF
1455 	unsigned int	check;
1456 #endif
1457 	END_ENV;
1458 
1459 	TASK_BEGIN
1460 	do{
1461 		ep->bytes = NULL;
1462 		/* Read length field, protocol version, and checksum */
1463 		ep->n = 0;
1464 		TASK_CALL(buffered_read_bytes(rfd, buf, (char*)ep->header_buf, MSG_HDR_SIZE, s, &ep->n));
1465 
1466 		if (ep->n != MSG_HDR_SIZE) {
1467 			DBGOUT(FN; NDBG(ep->n, u));
1468 			TASK_FAIL;
1469 		}
1470 
1471 		/* Check the protocol version before doing anything else */
1472 		ep->x_version = read_protoversion(VERS_PTR(ep->header_buf));
1473 		get_header_1_0(ep->header_buf, &ep->msgsize, & ep->x_type, &ep->tag);
1474 		if(ep->x_type == x_version_req){
1475 			/* Negotiation request. See what we can offer */
1476 			rfd->x_proto = negotiate_protocol(ep->x_version);
1477 			DBGOUT(STRLIT("incoming connection will use protcol version ");
1478 			   NDBG(rfd->x_proto,u); STRLIT(xcom_proto_to_str(rfd->x_proto)));
1479 			ADD_EVENTS(
1480 				add_event(string_arg("incoming connection will use protcol version"));
1481 				add_event(string_arg( xcom_proto_to_str(rfd->x_proto)));
1482 			);
1483 			if(rfd->x_proto > my_xcom_version)
1484 				TASK_FAIL;
1485 			set_connected(rfd, CON_PROTO);
1486 			TASK_CALL(send_proto(rfd, rfd->x_proto,  x_version_reply, ep->tag, ret));
1487 		} else if (ep->x_type == x_version_reply){
1488 			/* Mark connection with negotiated protocol version */
1489 			if(rfd->snd_tag == ep->tag){
1490 				rfd->x_proto = ep->x_version;
1491 				DBGOUT(STRLIT("peer connection will use protcol version ");
1492 					   NDBG(rfd->x_proto,u); STRLIT(xcom_proto_to_str(rfd->x_proto)));
1493 				ADD_EVENTS(
1494 					add_event(string_arg("peer connection will use protcol version"));
1495 				add_event(string_arg( xcom_proto_to_str(rfd->x_proto)));
1496 				);
1497 				if(rfd->x_proto > my_xcom_version || rfd->x_proto == x_unknown_proto)
1498 					TASK_FAIL;
1499 
1500 				set_connected(rfd, CON_PROTO);
1501 			}
1502 		}
1503 	}while(ep->x_type != x_normal);
1504 
1505 #ifdef XCOM_PARANOID
1506 	assert(check_protoversion(ep->x_version, rfd->x_proto));
1507 #endif
1508 	if (!check_protoversion(ep->x_version, rfd->x_proto)) {
1509 		TASK_FAIL;
1510 	}
1511 
1512 	/* OK, we can grok this version */
1513 
1514 	/* Allocate buffer space for message */
1515 	ep->bytes = calloc(1, ep->msgsize);
1516 	if(!ep->bytes){
1517 		TASK_FAIL;
1518 	}
1519 	/* Read message */
1520 	ep->n = 0;
1521 	TASK_CALL(buffered_read_bytes(rfd, buf, ep->bytes, ep->msgsize, s, &ep->n));
1522 
1523 	if (ep->n > 0) {
1524 		/* Deserialize message */
1525 		deserialize_ok = deserialize_msg(p, rfd->x_proto, ep->bytes, ep->msgsize);
1526 		MAY_DBG(FN; STRLIT(" deserialized message"));
1527 
1528 	}
1529 	/* Deallocate buffer */
1530 	X_FREE(ep->bytes);
1531 	if (ep->n <= 0 || !deserialize_ok) {
1532 		DBGOUT(FN; NDBG(ep->n, d); NDBG(deserialize_ok,d));
1533 		TASK_FAIL;
1534 	}
1535 	TASK_RETURN(ep->n);
1536 	FINALLY
1537 		TASK_END;
1538 }
1539 
recv_proto(connection_descriptor const * rfd,xcom_proto * x_proto,x_msg_type * x_type,unsigned int * tag,int64_t * ret)1540 int	recv_proto(connection_descriptor const * rfd, xcom_proto *x_proto,
1541                x_msg_type *x_type, unsigned int *tag, int64_t *ret)
1542 {
1543 	DECL_ENV
1544 	    int64_t	n;
1545 	unsigned char	header_buf[MSG_HDR_SIZE];
1546 	uint32_t	msgsize;
1547 	END_ENV;
1548 
1549 	TASK_BEGIN
1550 
1551 	/* Read length field, protocol version, and checksum */
1552 	ep->n = 0;
1553 	TASK_CALL(read_bytes(rfd, (char*)ep->header_buf, MSG_HDR_SIZE, 0, &ep->n));
1554 
1555 	if (ep->n != MSG_HDR_SIZE) {
1556 		DBGOUT(FN; NDBG(ep->n, ll));
1557 		TASK_FAIL;
1558 	}
1559 
1560 	*x_proto = read_protoversion(VERS_PTR(ep->header_buf));
1561 	get_header_1_0(ep->header_buf, &ep->msgsize, x_type, tag);
1562 	TASK_RETURN(ep->n);
1563 	FINALLY
1564 	    TASK_END;
1565 }
1566 
1567 
1568 /* }}} */
1569 
1570 /* {{{ Sender task */
1571 /* purecov: begin deadcode */
tag_check(unsigned int tag1,unsigned int tag2)1572 inline int tag_check(unsigned int tag1, unsigned int tag2)
1573 {
1574 	return (tag1 & 0xffff) == (tag2 & 0xffff);
1575 }
1576 /* purecov: end */
incr_tag(unsigned int tag)1577 static inline unsigned int incr_tag(unsigned int tag)
1578 {
1579 	++tag;
1580 	return tag & 0xffff;
1581 }
1582 
start_protocol_negotiation(channel * outgoing)1583 static void start_protocol_negotiation(channel *outgoing)
1584 {
1585 	msg_link * link = msg_link_new(0, VOID_NODE_NO);
1586 	MAY_DBG(FN; PTREXP(outgoing);
1587 			COPY_AND_FREE_GOUT(dbg_msg_link(link));
1588 			);
1589 	channel_put_front(outgoing, &link->l);
1590 }
1591 
1592 #define TAG_START 313
1593 
1594 /* Fetch messages from queue and send to other server.  Having a
1595    separate queue and task for doing this simplifies the logic since we
1596    never need to wait to send. */
sender_task(task_arg arg)1597 int	sender_task(task_arg arg)
1598 {
1599 	DECL_ENV
1600 	    server * s;
1601 	msg_link * link;
1602 	unsigned int tag;
1603 	END_ENV;
1604 
1605 	TASK_BEGIN
1606 
1607 	    ep->s = (server * )get_void_arg(arg);
1608 	ep->link = NULL;
1609 	ep->tag = TAG_START;
1610 	srv_ref(ep->s);
1611 
1612 	for(;;) {
1613 		/* Loop until connected */
1614 		while (!is_connected(&ep->s->con)) {
1615 			TASK_CALL(dial(ep->s));
1616 			if (ep->s->con.fd < 0) {
1617 				TASK_DELAY(1.000);
1618 			}
1619 			empty_msg_channel(&ep->s->outgoing);
1620 		}
1621 
1622 		reset_srv_buf(&ep->s->out_buf);
1623 
1624 		/* We are ready to start sending messages.
1625 		   Insert a message in the input queue to negotiate the protocol.
1626 		*/
1627 		start_protocol_negotiation(&ep->s->outgoing);
1628 		while (is_connected(&ep->s->con)) {
1629 
1630 			int64_t	ret;
1631 			assert(!ep->link);
1632 			if (0 && link_empty(&ep->s->outgoing.data)) {
1633 				TASK_DELAY(0.1 * my_drand48());
1634 			}
1635 			/*      FWD_ITER(&ep->s->outgoing.data, msg_link,
1636               DBGOUT(FN; PTREXP(link_iter));
1637               );
1638       */
1639 			if (link_empty(&ep->s->outgoing.data)) {
1640 				TASK_CALL(flush_srv_buf(ep->s, &ret));
1641 			}
1642 			CHANNEL_GET(&ep->s->outgoing, &ep->link, msg_link);
1643 			 {
1644 				int64_t	ret;
1645 				/* DBGOUT(FN; PTREXP(stack); PTREXP(ep->link)); */
1646 				MAY_DBG(FN; PTREXP(&ep->s->outgoing);
1647 				    COPY_AND_FREE_GOUT(dbg_msg_link(ep->link));
1648 				    );
1649 				MAY_DBG(FN; STRLIT(" extracted ");
1650 						COPY_AND_FREE_GOUT(dbg_linkage(&ep->link->l));
1651 				    );
1652 
1653 				/* If ep->link->p is 0, it is a protocol (re)negotiation request */
1654 				if(ep->link->p){
1655 						ADD_EVENTS(
1656 							add_event(string_arg("sending ep->link->p->synode"));
1657 							add_synode_event(ep->link->p->synode);
1658 							add_event(string_arg("to"));
1659 							add_event(uint_arg(ep->link->p->to));
1660 							add_event(string_arg(pax_op_to_str(ep->link->p->op)));
1661 						);
1662 						TASK_CALL(_send_msg(ep->s, ep->link->p, ep->link->to, &ret));
1663 						if(ret < 0){
1664 							goto next;
1665 						}
1666 						ADD_EVENTS(
1667 							add_event(string_arg("sent ep->link->p->synode"));
1668 							add_synode_event(ep->link->p->synode);
1669 							add_event(string_arg("to"));
1670 							add_event(uint_arg(ep->link->p->to));
1671 							add_event(string_arg(pax_op_to_str(ep->link->p->op)));
1672 						);
1673 				} else {
1674 					set_connected(&ep->s->con, CON_FD);
1675 					/* Send protocol negotiation request */
1676 					do{
1677 						TASK_CALL(send_proto(&ep->s->con, my_xcom_version,  x_version_req, ep->tag, &ret));
1678 						if(!is_connected(&ep->s->con)){
1679 							goto next;
1680 						}
1681 						ep->tag = incr_tag(ep->tag);
1682 					}while(ret < 0);
1683 					G_DEBUG("sent negotiation request for protocol %d",my_xcom_version);
1684 					ADD_EVENTS(
1685 						add_event(string_arg("sent negotiation request for protocol"));
1686 						add_event(string_arg( xcom_proto_to_str(my_xcom_version)));
1687 					);
1688 
1689 					/* Wait until negotiation done.
1690 					   reply_handler_task will catch reply and change state */
1691 					while(!proto_done(&ep->s->con)){
1692 						TASK_DELAY(0.1);
1693 						if(!is_connected(&ep->s->con)){
1694 							goto next;
1695 						}
1696 					}
1697 					G_DEBUG("will use protocol %d",ep->s->con.x_proto);
1698 					ADD_EVENTS(
1699 						add_event(string_arg("will use protocol"));
1700 						add_event(string_arg( xcom_proto_to_str(ep->s->con.x_proto)));
1701 					);
1702 
1703 				}
1704 			}
1705 		next:
1706 			msg_link_delete(&ep->link);
1707 			/* TASK_YIELD; */
1708 		}
1709 	}
1710 	FINALLY
1711 	    empty_msg_channel(&ep->s->outgoing);
1712 	ep->s->sender = NULL;
1713 	srv_unref(ep->s);
1714 	if (ep->link)
1715 		msg_link_delete(&ep->link);
1716 	TASK_END;
1717 }
1718 
1719 /* Fetch messages from queue and send to self.
1720    Having a separate mechanism for internal communication
1721    avoids SSL blocking when trying to connect to same thread. */
local_sender_task(task_arg arg)1722 int	local_sender_task(task_arg arg)
1723 {
1724 	DECL_ENV
1725 	    server * s;
1726 	msg_link * link;
1727 	END_ENV;
1728 
1729 	TASK_BEGIN
1730 
1731 	    ep->s = (server * )get_void_arg(arg);
1732 	ep->link = NULL;
1733 	srv_ref(ep->s);
1734 
1735 	reset_srv_buf(&ep->s->out_buf);
1736 
1737 	while (!xcom_shutdown) {
1738 
1739 		assert(!ep->link);
1740 		CHANNEL_GET(&ep->s->outgoing, &ep->link, msg_link);
1741 		 {
1742 			/* DBGOUT(FN; PTREXP(stack); PTREXP(ep->link)); */
1743 			MAY_DBG(FN; PTREXP(&ep->s->outgoing);
1744 					COPY_AND_FREE_GOUT(dbg_msg_link(ep->link));
1745 			    );
1746 			MAY_DBG(FN; STRLIT(" extracted ");
1747 					COPY_AND_FREE_GOUT(dbg_linkage(&ep->link->l));
1748 			    );
1749 			assert(ep->link->p);
1750 			ep->link->p->to = ep->link->p->from;
1751 			dispatch_op(find_site_def(ep->link->p->synode), ep->link->p, NULL);
1752 		}
1753 		msg_link_delete(&ep->link);
1754 	}
1755 	FINALLY
1756 	    empty_msg_channel(&ep->s->outgoing);
1757 	ep->s->sender = NULL;
1758 	srv_unref(ep->s);
1759 	if (ep->link)
1760 		msg_link_delete(&ep->link);
1761 	TASK_END;
1762 }
1763 
1764 
1765 /* }}} */
1766 
end_token(char * a)1767 static int	end_token(char *a)
1768 {
1769 	int	i = 0;
1770 	while (a[i] != 0 && a[i] != ':') {
1771 		i++;
1772 	}
1773 	return(i);
1774 }
1775 
1776 
token_copy(char * a,int i)1777 static char	*token_copy(char *a, int i)
1778 {
1779 	char	*ret;
1780 	ret = calloc(1, (size_t)(i + 1));
1781 	if(!ret)
1782 		return ret;
1783 	ret[i--] = 0;
1784 	while (i >= 0) {
1785 		ret[i] = a[i];
1786 		i--;
1787 	}
1788 	return ret;
1789 }
1790 
1791 
1792 /* Get host name from host:port string */
get_name(char * a)1793 static char	*get_name(char *a)
1794 {
1795 	int	i = end_token(a);
1796 	return token_copy(a, i);
1797 }
1798 
1799 /* Get host name from host:port string */
xcom_get_name(char * a)1800 char	*xcom_get_name(char *a)
1801 {
1802 	return get_name(a);
1803 }
1804 
1805 
1806 /* Get port from host:port string */
get_port(char * a)1807 static xcom_port get_port(char *a)
1808 {
1809 	int i = end_token(a);
1810 	if (a[i] != 0 ) {
1811 		int port = atoi(a+i+1);
1812 		if (number_is_valid_port(port))
1813 				return (xcom_port) port;
1814   }
1815   return 0;
1816 }
1817 
1818 
xcom_get_port(char * a)1819 xcom_port xcom_get_port(char *a)
1820 {
1821 	return a ? get_port(a): 0;
1822 }
1823 
1824 
find_server(server * table[],int n,char * name,xcom_port port)1825 static server *find_server(server *table[], int n, char *name, xcom_port port)
1826 {
1827 	int	i;
1828 	for (i = 0; i < n; i++) {
1829 		server * s = table[i];
1830 		if (s && strcmp(s->srv, name) == 0 && s->port == port) /* FIXME should use IP address */
1831 			return s;
1832 	}
1833 	return 0;
1834 }
1835 
1836 
update_servers(site_def * s,cargo_type operation)1837 void update_servers(site_def *s, cargo_type operation)
1838 {
1839 	u_int	n;
1840 
1841 	if (s) {
1842 		u_int i = 0;
1843 		n = s->nodes.node_list_len;
1844 
1845 		DBGOUT(FN; NDBG(get_maxnodes(s), u); NDBG(n, d); PTREXP(s));
1846 
1847 		for (i = 0; i < n; i++) {
1848 			char	*addr = s->nodes.node_list_val[i].address;
1849 			char	*name = get_name(addr);
1850 			xcom_port	port = get_port(addr);
1851 			server * sp = find_server(all_servers, maxservers, name, port);
1852 
1853 			if (sp) {
1854 				DBGOUT(FN; STRLIT("re-using server "); NDBG(i, d); STREXP(name));
1855 				free(name);
1856 				s->servers[i] = sp;
1857 				if(sp->invalid)
1858 					sp->invalid= 0;
1859 			} else { /* No server? Create one */
1860 				DBGOUT(FN; STRLIT("creating new server "); NDBG(i, d); STREXP(name));
1861 				if (port > 0)
1862 					s->servers[i] = addsrv(name, port);
1863 				else
1864 					s->servers[i] = addsrv(name, xcom_listen_port);
1865 			}
1866 		}
1867 		/* Zero the rest */
1868 		for (i = n; i < NSERVERS; i++) {
1869 			s->servers[i] = 0;
1870 		}
1871 		/*
1872 		If we have a force config, mark the servers that do not belong to this
1873 		configuration as invalid
1874 		*/
1875 
1876 		if(operation == force_config_type) {
1877 			const site_def* old_site_def= get_prev_site_def();
1878 			invalidate_servers(old_site_def, s);
1879 		}
1880 
1881 	}
1882 }
1883 
1884 
1885 /*
1886  Make a diff between 2 site_defs and mark as invalid servers
1887  that do not belong to the new site_def.
1888  This is only to be used if we are forcing a configuration.
1889 */
invalidate_servers(const site_def * old_site_def,const site_def * new_site_def)1890 void invalidate_servers(const site_def* old_site_def,
1891 			const site_def* new_site_def) {
1892 	u_int node= 0;
1893 	for(; node < get_maxnodes(old_site_def); node++){
1894 		node_address* node_addr_from_old_site_def= &old_site_def->nodes.node_list_val[node];
1895 		if(!node_exists(node_addr_from_old_site_def, &new_site_def->nodes))
1896 		{
1897 			char *addr = node_addr_from_old_site_def->address;
1898 			char *name = get_name(addr);
1899 			xcom_port port = get_port(addr);
1900 			server *sp = find_server(all_servers, maxservers, name, port);
1901 			if (sp) {
1902 				sp->invalid= 1;
1903 			}
1904 			free(name);
1905 		}
1906 	}
1907 }
1908 
1909 
1910 /* Remove tcp connections which seem to be idle */
tcp_reaper_task(task_arg arg MY_ATTRIBUTE ((unused)))1911 int	tcp_reaper_task(task_arg arg MY_ATTRIBUTE((unused)))
1912 {
1913 	DECL_ENV
1914 	    int	dummy;
1915 	END_ENV;
1916 	TASK_BEGIN
1917 	    while (!xcom_shutdown) {
1918 		int i;
1919 		double	now = task_now();
1920 		for (i = 0; i < maxservers; i++) {
1921 			server * s = all_servers[i];
1922 			if (s && s->con.fd != -1 && (s->active + 10.0) < now) {
1923 				shutdown_connection(&s->con);
1924 			}
1925 		}
1926 		TASK_DELAY(1.0);
1927 	}
1928 	FINALLY
1929 	    TASK_END;
1930 }
1931 
1932 
1933 #define TERMINATE_CLIENT(ep) {						\
1934 		if (ep->s->crash_on_error)				\
1935 			abort();							\
1936 		TERMINATE;								\
1937 	}
1938 
1939 
1940 /*
1941 One-shot task to send a message to any xcom node via the client interface.
1942 The sender need not be part of any group.
1943 Any tcp connection may be used, as long as the message is a pax_msg
1944 serialized with serialize_msg. Doing it this way is simply the most
1945 convenient way of sending something to a specific address/port without blocking
1946 the task system. Error handling is very rudimentary.
1947 */
1948 /* purecov: begin deadcode */
1949 /* Try to connect to another node */
client_dial(char * srv,xcom_port port,connection_descriptor * con)1950 static int	client_dial(char *srv, xcom_port port, connection_descriptor *con)
1951 {
1952 	DECL_ENV
1953 	    int	dummy;
1954 	END_ENV;
1955 
1956 	TASK_BEGIN
1957 	    DBGOUT(FN; STRLIT(" dial "); NPUT(get_nodeno(get_site_def()), u); STRLIT(srv); NDBG(port, d));
1958 	TASK_CALL(connect_tcp(srv, port, &con->fd));
1959 	/* DBGOUT(FN; NDBG(con->fd,d);); */
1960 	if (con->fd < 0) {
1961 		DBGOUT(FN; STRLIT("could not dial "); STRLIT(srv); NDBG(port, d); );
1962 	} else {
1963 		if (NAGLE == 0) {
1964 			set_nodelay(con->fd);
1965 		}
1966 
1967 		unblock_fd(con->fd);
1968 #ifdef XCOM_HAVE_OPENSSL
1969 		if (xcom_use_ssl()) {
1970 			result ret = {0,0};
1971 			SSL_CONNECT((*con), srv);
1972 		}
1973 #endif
1974 		DBGOUT(FN; STRLIT("connected to "); STRLIT(srv); NDBG(con->fd, d); NDBG(port, d));
1975 		set_connected(con, CON_FD);
1976 	}
1977 	FINALLY
1978 	    TASK_END;
1979 }
1980 
1981 
client_task(task_arg arg)1982 int	client_task(task_arg arg)
1983 {
1984 	DECL_ENV
1985 	    envelope * s;
1986 	u_int	buflen;
1987 	char	*buf;
1988 	connection_descriptor c_descriptor;
1989 	xcom_proto x_proto;
1990 	x_msg_type x_type;
1991 	unsigned int	tag;
1992 	END_ENV;
1993 
1994 	TASK_BEGIN
1995 
1996 	    ep->s = (envelope * )get_void_arg(arg);
1997 	ep->c_descriptor.fd = -1;
1998 #ifdef XCOM_HAVE_OPENSSL
1999 	ep->c_descriptor.ssl_fd = 0;
2000 #endif
2001 	ep->buf = 0;
2002 	ep->x_proto = my_xcom_version;
2003 
2004 	/* Loop until connected */
2005 	while (!is_connected(&ep->c_descriptor)) {
2006 		TASK_CALL(client_dial(ep->s->srv, ep->s->port, &ep->c_descriptor));
2007 		if (ep->c_descriptor.fd < 0) {
2008 			TASK_DELAY(1.000);
2009 		}
2010 	}
2011 
2012 
2013 #ifdef XCOM_HAVE_OPENSSL
2014 	if (xcom_use_ssl()) {
2015 		result ret = {
2016 			0, 0		};
2017 		SSL_CONNECT(ep->c_descriptor, ep->s->srv);
2018 	}
2019 #endif
2020 	 {
2021 		int64_t	sent;
2022 		int64_t	n;
2023 		/* Send protocol negotiation request */
2024 		DBGOUT(FN);
2025 		TASK_CALL(send_proto(&ep->c_descriptor, my_xcom_version,  x_version_req, TAG_START, &sent));
2026 		if (sent < 0) {
2027 			TERMINATE_CLIENT(ep);
2028 		}
2029 
2030 		DBGOUT(FN);
2031 		/* Wait for answer and read protocol version */
2032 		TASK_CALL(recv_proto(&ep->c_descriptor, &ep->x_proto, &ep->x_type, &ep->tag, &n));
2033 		if (n < 0) {
2034 			TERMINATE_CLIENT(ep);
2035 		}
2036 
2037 		DBGOUT(FN);
2038 		if (ep->tag == TAG_START && ep->x_type == x_version_reply) {
2039 			DBGOUT(STRLIT("client task will use protcol version ");
2040 			    NDBG(ep->x_proto, u); STRLIT(xcom_proto_to_str(ep->x_proto)));
2041 			if (ep->x_proto == x_unknown_proto) {
2042 				TERMINATE_CLIENT(ep);
2043 			}
2044 
2045 			DBGOUT(FN);
2046 			ep->c_descriptor.x_proto = ep->x_proto;
2047 			/* Send message */
2048 			serialize_msg(ep->s->p, ep->c_descriptor.x_proto, &ep->buflen, &ep->buf);
2049 			if (ep->buflen) {
2050 				DBGOUT(FN);
2051 				TASK_CALL(task_write(&ep->c_descriptor, ep->buf, ep->buflen, &sent));
2052 				if (ep->buflen != sent) {
2053 					DBGOUT(FN; STRLIT("write failed "); STRLIT(ep->s->srv); NDBG(ep->s->port, d);
2054 					    NDBG(ep->buflen, d); NDBG(sent, d));
2055 					TERMINATE_CLIENT(ep);
2056 				}
2057 			}
2058 		} else {
2059 			DBGOUT(FN);
2060 			TERMINATE_CLIENT(ep);
2061 		}
2062 	}
2063 
2064 	FINALLY
2065 	    shutdown_connection(&ep->c_descriptor);
2066 	X_FREE(ep->buf);
2067 	free(ep->s->srv);
2068 	XCOM_XDR_FREE(xdr_pax_msg, ep->s->p);
2069 	free(ep->s);
2070 	TASK_END;
2071 }
2072 /* purecov: end */
2073 
2074 #ifdef XCOM_HAVE_OPENSSL
ssl_free_con(connection_descriptor * con)2075 void ssl_free_con(connection_descriptor *con)
2076 {
2077 	SSL_free(con->ssl_fd);
2078 	con->ssl_fd= NULL;
2079 }
2080 
ssl_shutdown_con(connection_descriptor * con)2081 void ssl_shutdown_con(connection_descriptor *con)
2082 {
2083 	if(con->fd >= 0 && con->ssl_fd != NULL) {
2084 		SSL_shutdown(con->ssl_fd);
2085 		ssl_free_con(con);
2086 	}
2087 }
2088 #endif
2089 
close_connection(connection_descriptor * con)2090 void close_connection(connection_descriptor *con)
2091 {
2092 	shut_close_socket(&con->fd);
2093 	con->fd = -1;
2094 	set_connected(con, CON_NULL);
2095 }
2096 
shutdown_connection(connection_descriptor * con)2097 void shutdown_connection(connection_descriptor *con)
2098 {
2099 	/* printstack(1); */
2100 	ADD_EVENTS(
2101 		add_event(string_arg("con->fd"));
2102 		add_event(int_arg(con->fd));
2103 	);
2104 #ifdef XCOM_HAVE_OPENSSL
2105 	ssl_shutdown_con(con);
2106 #endif
2107 	close_connection(con);
2108 }
2109 
reset_connection(connection_descriptor * con)2110 void reset_connection(connection_descriptor *con)
2111 {
2112 	con->fd = -1;
2113 #ifdef XCOM_HAVE_OPENSSL
2114 	con->ssl_fd = 0;
2115 #endif
2116 	set_connected(con, CON_NULL);
2117 }
2118 
2119 /* The protocol version used by the group as a whole is the minimum of the
2120  maximum protocol versions in the config. */
common_xcom_version(site_def const * site)2121 xcom_proto common_xcom_version(site_def const *site)
2122 {
2123 	u_int i;
2124 	xcom_proto min_proto = my_xcom_version;
2125 	for(i = 0; i < site->nodes.node_list_len; i++){
2126 		min_proto = MIN(min_proto, site->nodes.node_list_val[i].proto.max_proto);
2127 	}
2128 	return min_proto;
2129 }
2130 
2131 static xcom_proto latest_common_proto = MY_XCOM_PROTO;
2132 
set_latest_common_proto(xcom_proto x_proto)2133 xcom_proto set_latest_common_proto(xcom_proto x_proto)
2134 {
2135 	return latest_common_proto = x_proto;
2136 }
2137 
get_latest_common_proto()2138 xcom_proto get_latest_common_proto()
2139 {
2140 	return latest_common_proto;
2141 }
2142 
2143 /* See which protocol we can use.
2144    Needs to be redefined as the protocol changes */
negotiate_protocol(xcom_proto proto_vers)2145 xcom_proto negotiate_protocol(xcom_proto proto_vers)
2146 {
2147 	/* Ensure that protocol will not be greater than
2148 	my_xcom_version */
2149 	if(proto_vers < my_min_xcom_version){
2150 		return x_unknown_proto;
2151 	}else if(proto_vers > my_xcom_version){
2152 		return my_xcom_version;
2153 	}else{
2154 		return proto_vers;
2155 	}
2156 }
2157 
2158 /*
2159    Encode and decode node_address with protocol version 0.
2160    This version is frozen forever, so having a handcrafted (in reality mostly copied)
2161    xdr function here is OK.
2162 */
2163 /* purecov: begin deadcode */
xdr_node_address_with_1_0(XDR * xdrs,node_address * objp)2164 bool_t xdr_node_address_with_1_0 (XDR *xdrs, node_address *objp)
2165 {
2166 	if (!xdr_string (xdrs, &objp->address, ~(u_int)0))
2167 		return FALSE;
2168 	if (!xdr_blob (xdrs, &objp->uuid))
2169 		return FALSE;
2170 	if (xdrs->x_op == XDR_DECODE) {
2171 		objp->proto.min_proto = x_1_0; /* A node which speaks protocol version 0 only supports version 0 */
2172 		objp->proto.max_proto = x_1_0;
2173 	}
2174 	return TRUE;
2175 }
2176 /* purecov: end */
2177 
2178 /* Encode and decode a node_list while respecting protocol version */
xdr_node_list_1_1(XDR * xdrs,node_list_1_1 * objp)2179 bool_t xdr_node_list_1_1(XDR *xdrs, node_list_1_1 *objp)
2180 {
2181 	xcom_proto vx = *((xcom_proto*)xdrs->x_public);
2182 	/* Select protocol encode/decode based on the x_public field of the xdr struct */
2183 	switch(vx){
2184 	case x_1_0:
2185 		return xdr_array (xdrs, (char **)&objp->node_list_val, (u_int *) &objp->node_list_len, NSERVERS,
2186 		sizeof (node_address), (xdrproc_t) xdr_node_address_with_1_0);
2187 	case x_1_1:
2188 	case x_1_2:
2189 		return xdr_array (xdrs, (char **)&objp->node_list_val, (u_int *) &objp->node_list_len, NSERVERS,
2190 		sizeof (node_address), (xdrproc_t) xdr_node_address);
2191 	default:
2192 		return FALSE;
2193 	}
2194 }
2195 
2196 /* Encode and decode a application data with added check that there is enough data when decoding */
xdr_checked_data(XDR * xdrs,checked_data * objp)2197 bool_t xdr_checked_data(XDR *xdrs, checked_data *objp)
2198 {
2199 	/*
2200 		Sanity check. x_handy is number of remaining bytes. For old XDR,
2201 		x_handy is int type. So type cast is used to eliminate a warning.
2202 	*/
2203 	if(xdrs->x_op == XDR_DECODE && (objp->data_len + 4 )> (u_int)xdrs->x_handy)
2204 		return FALSE;
2205 	return xdr_bytes(xdrs, (char **)&objp->data_val, (u_int *) &objp->data_len, 0xffffffff);
2206 }
2207 
xdr_pax_msg(XDR * xdrs,pax_msg * objp)2208 bool_t xdr_pax_msg(XDR *xdrs, pax_msg *objp)
2209 {
2210 	xcom_proto vx = *((xcom_proto * )xdrs->x_public);
2211 	/* Select protocol encode/decode based on the x_public field of the xdr struct */
2212 	switch (vx) {
2213 	case x_1_0:
2214 	case x_1_1:
2215 		if (!xdr_pax_msg_1_1(xdrs, (pax_msg_1_1*)objp))
2216 			return FALSE;
2217 		if (xdrs->x_op == XDR_DECODE)
2218 			objp->delivered_msg = get_delivered_msg(); /* Use our own minimum */
2219 		return TRUE;
2220 	case x_1_2:
2221 		return xdr_pax_msg_1_2(xdrs, objp);
2222 	default:
2223 		return FALSE;
2224 	}
2225 }
2226 
2227 
2228