1 /* Copyright (c) 2015, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include <rpc/rpc.h>
24
25 #include <assert.h>
26 #include <errno.h>
27 #include <string.h>
28 #include <stdlib.h>
29 #include <math.h>
30 #include <limits.h>
31
32 #include "xcom_common.h"
33 #include "x_platform.h"
34 #include "simset.h"
35 #include "xcom_vp.h"
36 #include "task.h"
37 #include "task_os.h"
38 #include "task_debug.h"
39 #include "node_no.h"
40 #include "server_struct.h"
41 #include "xcom_detector.h"
42 #include "site_struct.h"
43 #include "node_connection.h"
44 #include "node_list.h"
45 #include "xcom_transport.h"
46 #include "xcom_statistics.h"
47 #include "xcom_base.h"
48 #include "xcom_vp_str.h"
49 #include "xcom_msg_queue.h"
50 #include "xcom_memory.h"
51 #include "site_def.h"
52 #include "synode_no.h"
53
54 #ifdef XCOM_HAVE_OPENSSL
55 #include "openssl/ssl.h"
56 #include "openssl/err.h"
57 #endif
58 #include "sock_probe.h"
59 #include "retry.h"
60 #ifdef XCOM_HAVE_OPENSSL
61 #include "xcom_ssl_transport.h"
62 #endif
63
64 #define MY_XCOM_PROTO x_1_2
65
66 xcom_proto const my_min_xcom_version = x_1_0; /* The minimum protocol version I am able to understand */
67 xcom_proto const my_xcom_version = MY_XCOM_PROTO; /* The maximun protocol version I am able to understand */
68
69 /* #define XCOM_ECM */
70
71 #define SERVER_MAX (2*NSERVERS)
72
73 /* Turn Nagle's algorithm on or off */
74 static int const NAGLE = 0;
75
76 extern int xcom_shutdown;
77
78 static void shut_srv(server *s);
79
80 static xcom_port xcom_listen_port = 0; /* Port used by xcom */
81
82 static xcom_socket_accept_cb xcom_socket_accept_callback= NULL;
83
84 /* purecov: begin deadcode */
pm(xcom_port port)85 static int pm(xcom_port port)
86 {
87 return port == xcom_listen_port;
88 }
89 /* purecov: end */
90
set_xcom_socket_accept_cb(xcom_socket_accept_cb x)91 int set_xcom_socket_accept_cb(xcom_socket_accept_cb x)
92 {
93 xcom_socket_accept_callback= x;
94 return 1;
95 }
96
init_xcom_transport(xcom_port listen_port)97 void init_xcom_transport(xcom_port listen_port)
98 {
99 xcom_listen_port = listen_port;
100 if(get_port_matcher() == 0)
101 /* purecov: begin deadcode */
102 set_port_matcher(pm);
103 /* purecov: end */
104 }
105
106
reset_srv_buf(srv_buf * sb)107 void reset_srv_buf(srv_buf *sb)
108 {
109 sb->start = 0;
110 sb->n = 0;
111 }
112
113
114 /* Note that channel is alive */
alive(server * s)115 static void alive(server *s)
116 {
117 if (s) {
118 s->active = task_now();
119 }
120 }
121
122
123
srv_buf_capacity(srv_buf * sb)124 static u_int srv_buf_capacity(srv_buf *sb)
125 {
126 return sizeof(sb->buf);
127 }
128
srv_buf_free_space(srv_buf * sb)129 static u_int srv_buf_free_space(srv_buf *sb)
130 {
131 return ((u_int)sizeof(sb->buf)) - sb->n;
132 }
133
srv_buf_buffered(srv_buf * sb)134 static u_int srv_buf_buffered(srv_buf *sb)
135 {
136 return sb->n - sb->start;
137 }
138
srv_buf_extract_ptr(srv_buf * sb)139 static char *srv_buf_extract_ptr(srv_buf *sb)
140 {
141 return & sb->buf[sb->start];
142 }
143
144
srv_buf_insert_ptr(srv_buf * sb)145 static char *srv_buf_insert_ptr(srv_buf *sb)
146 {
147 return & sb->buf[sb->n];
148 }
149
150
advance_extract_ptr(srv_buf * sb,u_int len)151 static inline void advance_extract_ptr(srv_buf *sb, u_int len)
152 {
153 sb->start += len;
154 }
155
get_srv_buf(srv_buf * sb,char * data,u_int len)156 static u_int get_srv_buf(srv_buf *sb, char *data, u_int len)
157 {
158 if(len > srv_buf_buffered(sb)){
159 len = srv_buf_buffered(sb);
160 }
161
162 memcpy(data, srv_buf_extract_ptr(sb), len);
163 advance_extract_ptr(sb, len);
164 return len;
165 }
166
advance_insert_ptr(srv_buf * sb,u_int len)167 static inline void advance_insert_ptr(srv_buf *sb, u_int len)
168 {
169 sb->n += len;
170 }
171
put_srv_buf(srv_buf * sb,char * data,u_int len)172 static u_int put_srv_buf(srv_buf *sb, char *data, u_int len)
173 {
174 assert(sb->n + len <= sizeof(sb->buf));
175 memcpy(srv_buf_insert_ptr(sb), data, len);
176 advance_insert_ptr(sb, len);
177 return len;
178 }
179
180
flush_srv_buf(server * s,int64_t * ret)181 int flush_srv_buf(server *s, int64_t *ret)
182 {
183 DECL_ENV
184 u_int buflen;
185 END_ENV;
186
187 TASK_BEGIN
188 ep->buflen = s->out_buf.n;
189 reset_srv_buf(&s->out_buf);
190 if (s->con.fd >= 0) {
191 int64_t sent = 0;
192 if (ep->buflen) {
193 /* DBGOUT(FN; PTREXP(stack); NDBG(ep->buflen, u)); */
194 /* LOCK_FD(s->con.fd, 'w'); */
195 TASK_CALL(task_write(&s->con, s->out_buf.buf, ep->buflen, &sent));
196 /* UNLOCK_FD(s->fd, 'w'); */
197 if (sent <= 0) {
198 shutdown_connection(&s->con);
199 }
200 }
201 TASK_RETURN(sent);
202 } else {
203 TASK_FAIL;
204 }
205
206 FINALLY
207 TASK_END;
208 }
209
210 /* Send a message to server s */
_send_msg(server * s,pax_msg * p,node_no to,int64_t * ret)211 static int _send_msg(server *s, pax_msg *p, node_no to, int64_t *ret)
212 {
213 DECL_ENV
214 uint32_t buflen;
215 char *buf;
216 END_ENV;
217
218 TASK_BEGIN
219 p->to = to;
220 MAY_DBG(FN; PTREXP(stack); PTREXP(s); PTREXP(p); NDBG(s->con.fd, d));
221 MAY_DBG(FN;
222 STREXP(s->srv);
223 NDBG(s->port, d);
224 NDBG(task_now(), f);
225 COPY_AND_FREE_GOUT(dbg_pax_msg(p));
226 );
227 if (to == p->from) {
228 MAY_DBG(FN;
229 COPY_AND_FREE_GOUT(dbg_pax_msg(p)); );
230 dispatch_op(find_site_def(p->synode), p, NULL);
231 TASK_RETURN(sizeof(*p));
232 } else {
233 if (s->con.fd >= 0) {
234 int64_t sent= 0;
235 /* LOCK_FD(s->con.fd, 'w'); */
236 serialize_msg(p, s->con.x_proto, &ep->buflen, &ep->buf);
237 if(ep->buflen){
238 /* Not enough space? Flush the buffer */
239 if (ep->buflen > srv_buf_free_space(&s->out_buf)) {
240 TASK_CALL(flush_srv_buf(s, ret));
241 if (s->con.fd < 0) {
242 TASK_FAIL;
243 }
244 /* Still not enough? Message must be huge, send without buffering */
245 if (ep->buflen > srv_buf_free_space(&s->out_buf)) {
246 DBGOUT(FN; STRLIT("task_write"));
247 TASK_CALL(task_write(&s->con, ep->buf, ep->buflen, &sent));
248 if (s->con.fd < 0) {
249 TASK_FAIL;
250 }
251 } else { /* Buffer the write */
252 put_srv_buf(&s->out_buf, ep->buf, ep->buflen);
253 sent = ep->buflen;
254 }
255 } else { /* Buffer the write */
256 put_srv_buf(&s->out_buf, ep->buf, ep->buflen);
257 sent = ep->buflen;
258 }
259 send_count[p->op]++;
260 send_bytes[p->op] += ep->buflen;
261 alive(s); /* Note activity */
262 /* DBGOUT(STRLIT("sent message "); STRLIT(pax_op_to_str(p->op)); */
263 /* NDBG(p->from,d); NDBG(p->to,d); */
264 /* SYCEXP(p->synode); */
265 /* BALCEXP(p->proposal)); */
266 X_FREE(ep->buf);
267 /* UNLOCK_FD(s->con.fd, 'w'); */
268 if (sent <= 0) {
269 shutdown_connection(&s->con);
270 }
271 }
272 TASK_RETURN(sent);
273 } else
274 TASK_FAIL;
275 }
276 FINALLY
277 if (ep->buf)
278 X_FREE(ep->buf);
279 TASK_END;
280 }
281
282
write_protoversion(unsigned char * buf,xcom_proto proto_vers)283 void write_protoversion(unsigned char *buf, xcom_proto proto_vers)
284 {
285 put_32(VERS_PTR(buf), proto_vers);
286 }
287
read_protoversion(unsigned char * p)288 xcom_proto read_protoversion(unsigned char *p)
289 {
290 return get_32(p);
291 }
292
check_protoversion(xcom_proto x_proto,xcom_proto negotiated)293 int check_protoversion(xcom_proto x_proto, xcom_proto negotiated)
294 {
295 if(x_proto != negotiated){
296 DBGOUT(FN; STRLIT(" found XCOM protocol version ");
297 NDBG(x_proto,d); STRLIT(" need version ");
298 NDBG(negotiated,d); );
299
300 return 0;
301 }
302 return 1;
303 }
304
305
306 /* Send a protocol negotiation message on connection con */
send_proto(connection_descriptor * con,xcom_proto x_proto,x_msg_type x_type,unsigned int tag,int64_t * ret)307 int send_proto(connection_descriptor *con, xcom_proto x_proto,
308 x_msg_type x_type, unsigned int tag, int64_t *ret)
309 {
310 DECL_ENV
311 char buf[MSG_HDR_SIZE];
312 END_ENV;
313
314 TASK_BEGIN
315 if (con->fd >= 0) {
316 con->snd_tag = tag;
317 write_protoversion(VERS_PTR((unsigned char*) ep->buf), x_proto);
318 put_header_1_0((unsigned char*) ep->buf, 0, x_type, tag);
319
320 {
321 int64_t sent;
322
323 TASK_CALL(task_write(con, ep->buf, MSG_HDR_SIZE, &sent));
324 if (con->fd < 0) {
325 TASK_FAIL;
326 }
327 if (sent <= 0) {
328 shutdown_connection(con);
329 }
330 TASK_RETURN(sent);
331 }
332 } else {
333 TASK_FAIL;
334 }
335 FINALLY
336
337 TASK_END;
338 }
339
apply_xdr(xcom_proto x_proto,gpointer buff,uint32_t bufflen,xdrproc_t xdrfunc,void * xdrdata,enum xdr_op op)340 int apply_xdr(xcom_proto x_proto, gpointer buff, uint32_t bufflen,
341 xdrproc_t xdrfunc, void *xdrdata,
342 enum xdr_op op)
343 {
344 XDR xdr;
345 int MY_ATTRIBUTE ((unused)) s = 0;
346
347 xdr.x_ops = NULL;
348 xdrmem_create(&xdr, buff, bufflen, op);
349 /*
350 Mac OSX changed the xdrproc_t prototype to take
351 three parameters instead of two.
352
353 The argument is that it has the potential to break
354 the ABI due to compiler optimizations.
355
356 The recommended value for the third parameter is
357 0 for those that are not making use of it (which
358 is the case). This will keep this code cross-platform
359 and cross-version compatible.
360 */
361 if (xdr.x_ops){
362 xdr.x_public = (caddr_t)&x_proto; /* Supply protocol version in user field of xdr */
363 s = xdrfunc(&xdr, xdrdata, 0);
364 }
365 xdr_destroy(&xdr);
366 return s;
367 }
368
369
370 #if TASK_DBUG_ON
dump_header(char * buf)371 static void dump_header(char *buf)
372 {
373 char *end = buf + MSG_HDR_SIZE;
374 GET_GOUT;
375 STRLIT("message header ");
376 PTREXP(buf);
377 while (buf < end) {
378 NPUT(*buf, x);
379 buf++;
380 }
381 PRINT_GOUT;
382 FREE_GOUT;
383 }
384 #endif
385
386 void dbg_app_data(app_data_ptr a);
387
388 #ifdef OLD_XDR
389 #define const
390 #endif
391
392 /* ARGSUSED */
393 static bool_t
x_putlong(XDR * xdrs,const int * intp MY_ATTRIBUTE ((unused)))394 x_putlong (XDR *xdrs,
395 #if defined(__APPLE__) && defined(__LP64__)
396 const int *intp MY_ATTRIBUTE((unused))
397 #elif defined(X_PUTLONG_NOT_USE_CONST)
398 long *longp MY_ATTRIBUTE((unused))
399 #else
400 const long *longp MY_ATTRIBUTE((unused))
401 #endif
402 )
403 {
404 xdrs->x_handy += BYTES_PER_XDR_UNIT;
405 return TRUE;
406 }
407
408 /* ARGSUSED */
409 #ifdef OLD_XDR
410 static bool_t
x_putbytes(XDR * xdrs,char * bp MY_ATTRIBUTE ((unused)),int len)411 x_putbytes (XDR *xdrs, char *bp MY_ATTRIBUTE((unused)), int len)
412 {
413 xdrs->x_handy += len;
414 return TRUE;
415 }
416 #else
417 static bool_t
x_putbytes(XDR * xdrs,const char * bp MY_ATTRIBUTE ((unused)),u_int len)418 x_putbytes (XDR *xdrs, const char *bp MY_ATTRIBUTE((unused)), u_int len)
419 {
420 xdrs->x_handy += len;
421 return TRUE;
422 }
423
424 #endif
425
426
427 static u_int
428 #if defined(__APPLE__) || defined(__FreeBSD__) || defined(HAVE_TIRPC)
x_getpostn(XDR * xdrs)429 x_getpostn(XDR *xdrs)
430 #else
431 x_getpostn (const XDR *xdrs)
432 #endif
433 {
434 #ifdef OLD_XDR
435 return (u_int)(xdrs->x_handy);
436 #else
437 return xdrs->x_handy;
438 #endif
439 }
440
441 /* ARGSUSED */
442 static bool_t
x_setpostn(XDR * xdrs MY_ATTRIBUTE ((unused)),u_int len MY_ATTRIBUTE ((unused)))443 x_setpostn (XDR *xdrs MY_ATTRIBUTE((unused)), u_int len MY_ATTRIBUTE((unused)))
444 {
445 /* This is not allowed */
446 return FALSE;
447 }
448
449 #ifdef HAVE_RPC_INLINE_T
450 #define INLINE_T rpc_inline_t
451 #else
452 #define INLINE_T int32_t
453 #endif
454
455 #ifdef OLD_XDR
456 static INLINE_T *
x_inline(XDR * xdrs,int len)457 x_inline (XDR *xdrs, int len)
458 {
459 if (len == 0)
460 return NULL;
461 if (xdrs->x_op != XDR_ENCODE)
462 return NULL;
463 if (len < (int) (long int) xdrs->x_base) {
464 /* x_private was already allocated */
465 xdrs->x_handy += len;
466 return (INLINE_T * ) xdrs->x_private;
467 } else {
468 /* Free the earlier space and allocate new area */
469 free (xdrs->x_private);
470 if ((xdrs->x_private = (caddr_t) malloc ((size_t)len)) == NULL) {
471 xdrs->x_base = 0;
472 return NULL;
473 }
474 xdrs->x_base = (void * ) (long) len;
475 xdrs->x_handy += len;
476 return (INLINE_T * ) xdrs->x_private;
477 }
478 }
479 #else
480 static INLINE_T *
x_inline(XDR * xdrs,u_int len)481 x_inline (XDR *xdrs, u_int len)
482 {
483 if (len == 0)
484 return NULL;
485 if (xdrs->x_op != XDR_ENCODE)
486 return NULL;
487 if (len < (u_int) (long int) xdrs->x_base) {
488 /* x_private was already allocated */
489 xdrs->x_handy += len;
490 return (INLINE_T * ) xdrs->x_private;
491 } else {
492 /* Free the earlier space and allocate new area */
493 free (xdrs->x_private);
494 if ((xdrs->x_private = (caddr_t) malloc (len)) == NULL) {
495 xdrs->x_base = 0;
496 return NULL;
497 }
498 xdrs->x_base = (void * ) (long) len;
499 xdrs->x_handy += len;
500 return (INLINE_T * ) xdrs->x_private;
501 }
502 }
503 #endif
504
505 #undef INLINE_T
506 /* purecov: begin deadcode */
507 static bool_t
508 /* to stop ANSI-C compiler from complaining */
509 #if defined(__APPLE__) && defined(__LP64__)
dummyfunc1(XDR * xdr MY_ATTRIBUTE ((unused)),int * ip MY_ATTRIBUTE ((unused)))510 dummyfunc1 (XDR *xdr MY_ATTRIBUTE((unused)), int *ip MY_ATTRIBUTE((unused)))
511 #else
512 dummyfunc1 (XDR *xdr MY_ATTRIBUTE((unused)), long *lp MY_ATTRIBUTE((unused)))
513 #endif
514 {
515 /* Always return FALSE/NULL, as the case may be */
516 return FALSE;
517 }
518
519 static bool_t
dummyfunc2(XDR * xdr MY_ATTRIBUTE ((unused)),caddr_t ct MY_ATTRIBUTE ((unused)),int i MY_ATTRIBUTE ((unused)))520 dummyfunc2 (XDR *xdr MY_ATTRIBUTE((unused)),
521 #ifdef OLD_XDR
522 caddr_t ct MY_ATTRIBUTE((unused)),
523 int i MY_ATTRIBUTE((unused)))
524 #else
525 caddr_t ct MY_ATTRIBUTE((unused)),
526 u_int u MY_ATTRIBUTE((unused)))
527 #endif
528 {
529 /* Always return FALSE/NULL, as the case may be */
530 return FALSE;
531 }
532
533 #ifdef HAVE_XDR_OPS_X_GETINT32
534 static bool_t
dummyfunc3(XDR * xdr MY_ATTRIBUTE ((unused)),int32_t * ip MY_ATTRIBUTE ((unused)))535 dummyfunc3 (XDR *xdr MY_ATTRIBUTE((unused)),
536 int32_t *ip MY_ATTRIBUTE((unused)))
537 {
538 /* Always return FALSE/NULL, as the case may be */
539 return FALSE;
540 }
541 #endif
542
543 static void
x_destroy(XDR * xdrs)544 x_destroy (XDR *xdrs)
545 {
546 xdrs->x_handy = 0;
547 xdrs->x_base = 0;
548 if (xdrs->x_private)
549 {
550 free (xdrs->x_private);
551 xdrs->x_private = NULL;
552 }
553 return;
554 }
555 /* purecov: end */
556 #ifdef HAVE_XDR_OPS_X_PUTINT32
557 static bool_t
558 #ifdef OLD_XDR
x_putint32(XDR * xdrs,int32_t * int32p MY_ATTRIBUTE ((unused)))559 x_putint32 (XDR *xdrs, int32_t *int32p MY_ATTRIBUTE((unused)))
560 #else
561 x_putint32 (XDR *xdrs, const int32_t *int32p MY_ATTRIBUTE((unused)))
562 #endif
563 {
564 xdrs->x_handy += BYTES_PER_XDR_UNIT;
565 return TRUE;
566 }
567 #endif
568
569 static uint64_t
xdr_proto_sizeof(xcom_proto x_proto,xdrproc_t func,void * data)570 xdr_proto_sizeof (xcom_proto x_proto, xdrproc_t func, void *data)
571 {
572 XDR x;
573 struct xdr_ops ops;
574 bool_t stat;
575
576
577 memset(&ops, 0, sizeof(struct xdr_ops));
578 ops.x_putlong = x_putlong;
579 ops.x_putbytes = x_putbytes;
580 ops.x_inline = x_inline;
581 ops.x_getpostn = x_getpostn;
582 ops.x_setpostn = x_setpostn;
583 ops.x_destroy = x_destroy;
584
585 #ifdef HAVE_XDR_OPS_X_PUTINT32
586 ops.x_putint32 = x_putint32;
587 #endif
588 /* the other harmless ones */
589 ops.x_getlong = dummyfunc1;
590 ops.x_getbytes = dummyfunc2;
591 #ifdef HAVE_XDR_OPS_X_GETINT32
592 ops.x_getint32 = dummyfunc3;
593 #endif
594 x.x_op = XDR_ENCODE;
595 x.x_ops = &ops;
596 x.x_handy = 0;
597 x.x_private = (caddr_t) NULL;
598 x.x_base = (caddr_t) 0;
599 x.x_public = (caddr_t)&x_proto;
600
601 /*
602 Mac OSX changed the xdrproc_t prototype to take
603 three parameters instead of two.
604
605 The argument is that it has the potential to break
606 the ABI due to compiler optimizations.
607
608 The recommended value for the third parameter is
609 0 for those that are not making use of it (which
610 is the case). This will keep this code cross-platform
611 and cross-version compatible.
612 */
613 stat = func (&x, data, 0);
614 free (x.x_private);
615 // x_handy is int type for old XDR
616 return stat == TRUE ? (uint64_t)x.x_handy : 0;
617 }
618
619 #ifdef OLD_XDR
620 #undef const
621 #endif
622
623 /* Return 0 if it fails to serialize the message, otherwise 1 is returned. */
serialize(void * p,xcom_proto x_proto,uint32_t * out_len,xdrproc_t xdrfunc,char ** out_buf)624 static int serialize(void *p, xcom_proto x_proto, uint32_t *out_len,
625 xdrproc_t xdrfunc, char **out_buf)
626 {
627 unsigned char *buf = NULL;
628 uint64_t msg_buflen = 0;
629 uint64_t tot_buflen = 0;
630 unsigned int tag = 0;
631 x_msg_type x_type = x_normal;
632 int retval = 0;
633
634 /* Find length of serialized message */
635 msg_buflen = xdr_proto_sizeof(x_proto, xdrfunc, p);
636 tot_buflen = SERIALIZED_BUFLEN(msg_buflen);
637 MAY_DBG(FN; NDBG(msg_buflen, llu); NDBG(tot_buflen, llu));
638 /*
639 Paxos message size is limited in UINT32 range. It will return an
640 error if the serialized message is bigger than UINT32_MAX bytes.
641 */
642 if (tot_buflen > UINT32_MAX)
643 {
644 G_ERROR("Serialized message exceeds 4GB limit.");
645 return retval;
646 }
647
648 /*
649 Allocate space for version number, length field, type, tag, and serialized
650 message. Explicit type case suppress the warnings on 32bits.
651 */
652 buf = calloc(1, (size_t)tot_buflen);
653 if (buf) {
654 /* Write protocol version */
655 write_protoversion(buf, x_proto);
656
657 /* Serialize message */
658 retval = apply_xdr(x_proto, MSG_PTR(buf), (uint32_t)msg_buflen,
659 xdrfunc,
660 p, XDR_ENCODE);
661 if(retval){
662 /* Serialize header into buf */
663 put_header_1_0(buf, (uint32_t)msg_buflen, x_type, tag);
664 }
665
666 *out_len = (uint32_t)tot_buflen;
667 *out_buf = (char * )buf;
668 }
669 MAY_DBG(FN; NDBG(*out_len, u); PTREXP(*out_buf);
670 dump_header(*out_buf));
671 return retval;
672 }
673
674
675 /* Version 1 has no new messages, only modified, so all should be sent */
old_proto_knows(xcom_proto x_proto MY_ATTRIBUTE ((unused)),pax_op op MY_ATTRIBUTE ((unused)))676 static inline int old_proto_knows(xcom_proto x_proto MY_ATTRIBUTE((unused)),
677 pax_op op MY_ATTRIBUTE((unused)))
678 {
679 return 1;
680 }
681
serialize_msg(pax_msg * p,xcom_proto x_proto,uint32_t * buflen,char ** buf)682 int serialize_msg(pax_msg *p, xcom_proto x_proto, uint32_t *buflen, char **buf)
683 {
684 *buflen = 0;
685 *buf = 0;
686
687 return old_proto_knows(x_proto, p->op) &&
688 serialize((void *)p, x_proto, buflen, (xdrproc_t)xdr_pax_msg, buf);
689 }
690
deserialize_msg(pax_msg * p,xcom_proto x_proto,char * buf,uint32_t buflen)691 int deserialize_msg(pax_msg *p, xcom_proto x_proto, char *buf, uint32_t buflen)
692 {
693 int apply_ok = apply_xdr(x_proto, buf, buflen,
694 (xdrproc_t)xdr_pax_msg,
695 (void * )p, XDR_DECODE);
696 if(!apply_ok){
697 my_xdr_free((xdrproc_t)xdr_pax_msg,
698 (char * )p);
699 }
700 return apply_ok;
701 }
702
703 /* Better checksum */
704 static uint32_t crc_table[256];
705
init_crc32c()706 void init_crc32c()
707 {
708 uint32_t i;
709 for (i = 0; i < 256; i++) {
710 int j;
711 uint32_t c = i;
712 for (j = 0; j < 8; j++) {
713 c = (c & 1) ? (0x82F63B78 ^ (c >> 1)) : (c >> 1);
714 }
715 crc_table[i] = c;
716 }
717 }
718
719
720 #define CRC32CSTART 0xFFFFFFFF
721
722 /* purecov: begin deadcode */
crc32c_hash(char * buf,char * end)723 uint32_t crc32c_hash(char *buf, char *end)
724 {
725 uint32_t c = CRC32CSTART;
726 unsigned char *p = (unsigned char*)buf;
727 unsigned char *e = (unsigned char*)end;
728 for (; p < e; p++) {
729 c = crc_table[(c ^ (*p)) & 0xFF] ^ (c >> 8);
730 }
731 return c ^ 0xFFFFFFFF;
732 }
733 /* purecov: end */
734
735 /* {{{ Paxos servers (nodes) */
736
737 /* Array of servers, only maxservers entries actually used */
738 static server *all_servers[SERVER_MAX];
739 static int maxservers = 0;
740
741 /* Create a new server */
742 static server *
mksrv(char * srv,xcom_port port)743 mksrv(char *srv, xcom_port port)
744 {
745 server * s;
746
747 s = calloc(1, sizeof (* s));
748
749 DBGOUT(FN; PTREXP(s); STREXP(srv));
750 if (s == 0) {
751 g_critical("out of memory");
752 abort();
753 }
754 s->garbage = 0;
755 s->invalid = 0;
756 s->refcnt = 0;
757 s->srv = srv;
758 s->port = port;
759 reset_connection(&s->con);
760 s->active = 0.0;
761 s->detected = 0.0;
762 channel_init(&s->outgoing, type_hash("msg_link"));
763 DBGOUT(FN; STREXP(srv); NDBG(port,d));
764 if (xcom_mynode_match(srv, port)) { /* Short-circuit local messages */
765 DBGOUT(FN; STRLIT("creating local sender"); STREXP(srv); NDBG(port,d));
766 s->sender = task_new(local_sender_task, void_arg(s), "local_sender_task", XCOM_THREAD_DEBUG);
767 }else{
768 s->sender = task_new(sender_task, void_arg(s), "sender_task", XCOM_THREAD_DEBUG);
769 DBGOUT(FN; STRLIT("creating sender and reply_handler"); STREXP(srv); NDBG(port,d));
770 s->reply_handler = task_new(reply_handler_task, void_arg(s), "reply_handler_task", XCOM_THREAD_DEBUG);
771 }
772 reset_srv_buf(&s->out_buf);
773 /*
774 Keep the server from being freed if the acceptor_learner_task calls
775 srv_unref on the server before the {local_,}server_task and
776 reply_handler_task begin.
777 */
778 srv_ref(s);
779 return s;
780 }
781
782
addsrv(char * srv,xcom_port port)783 static server *addsrv(char *srv, xcom_port port)
784 {
785 server * s = mksrv(srv, port);
786 assert(all_servers[maxservers] == 0);
787 assert(maxservers < SERVER_MAX);
788 all_servers[maxservers] = s;
789 MAY_DBG(FN; PTREXP(all_servers[maxservers]); STREXP(all_servers[maxservers]->srv); NDBG(all_servers[maxservers]->port, d); NDBG(maxservers, d));
790 maxservers++;
791 return s;
792 }
793
794
rmsrv(int i)795 static void rmsrv(int i)
796 {
797 assert(all_servers[i]);
798 assert(maxservers > 0);
799 assert(i < maxservers);
800 MAY_DBG(FN; PTREXP(all_servers[i]); STREXP(all_servers[i]->srv); NDBG(all_servers[i]->port, d); NDBG(i, d));
801 maxservers--;
802 all_servers[i] = all_servers[maxservers];
803 all_servers[maxservers] = 0;
804 }
805
806
init_collect()807 static void init_collect()
808 {
809 int i;
810
811 for (i = 0; i < maxservers; i++) {
812 assert(all_servers[i]);
813 all_servers[i]->garbage = 1;
814 }
815 }
816
817
818 extern void get_all_site_defs(site_def ***s, uint32_t *n);
819
mark_site_servers(site_def * site)820 static void mark_site_servers(site_def *site)
821 {
822 u_int i;
823 for (i = 0; i < get_maxnodes(site); i++) {
824 server * s = site->servers[i];
825 assert(s);
826 s->garbage = 0;
827 }
828 }
829
830
mark()831 static void mark()
832 {
833 site_def * *site;
834 uint32_t n;
835 uint32_t i;
836
837 get_all_site_defs(&site, &n);
838
839 for (i = 0; i < n; i++) {
840 if (site[i]) {
841 mark_site_servers(site[i]);
842 }
843 }
844 }
845
846
sweep()847 static void sweep()
848 {
849 int i = 0;
850 while (i < maxservers) {
851 server *s = all_servers[i];
852 assert(s);
853 if (s->garbage) {
854 DBGOUT(FN; STREXP(s->srv));
855 shut_srv(s);
856 rmsrv(i);
857 } else {
858 i++;
859 }
860 }
861 }
862
863
garbage_collect_servers()864 void garbage_collect_servers()
865 {
866 DBGOUT(FN);
867 init_collect();
868 mark();
869 sweep();
870 }
871
872
873 /* Free a server */
freesrv(server * s)874 static void freesrv(server *s)
875 {
876 X_FREE(s->srv);
877 X_FREE(s);
878 }
879
880
server_active(site_def const * s,node_no i)881 double server_active(site_def const *s, node_no i)
882 {
883 if (s->servers[i])
884 return s->servers[i]->active;
885 else
886 return 0.0;
887 }
888
889
890 /* Shutdown server */
shut_srv(server * s)891 static void shut_srv(server *s)
892 {
893 if (!s)
894 return;
895 DBGOUT(FN; PTREXP(s); STREXP(s->srv));
896
897 shutdown_connection(&s->con);
898
899 /* Tasks will free the server object when they terminate */
900 if (s->sender)
901 task_terminate(s->sender);
902 if (s->reply_handler)
903 task_terminate(s->reply_handler);
904
905 // Allow the server to be freed. This unref pairs with the ref from mksrv.
906 srv_unref(s);
907 }
908
909
srv_ref(server * s)910 int srv_ref(server *s)
911 {
912 assert(s->refcnt >= 0);
913 s->refcnt++;
914 return s->refcnt;
915 }
916
917
srv_unref(server * s)918 int srv_unref(server *s)
919 {
920 assert(s->refcnt >= 0);
921 s->refcnt--;
922 if (s->refcnt == 0) {
923 freesrv(s);
924 return 0;
925 }
926 return s->refcnt;
927 }
928
929
930 /* }}} */
931
932 /* Listen for connections on socket and create a handler task */
tcp_server(task_arg arg)933 int tcp_server(task_arg arg) {
934 DECL_ENV
935 int fd;
936 int cfd;
937 int refused;
938 END_ENV;
939 TASK_BEGIN
940 ep->fd = get_int_arg(arg);
941 ep->refused = 0;
942 unblock_fd(ep->fd);
943 DBGOUT(FN; NDBG(ep->fd, d););
944 G_MESSAGE("XCom protocol version: %d", my_xcom_version);
945 G_MESSAGE(
946 "XCom initialized and ready to accept incoming connections on port %d",
947 xcom_listen_port);
948 do {
949 TASK_CALL(accept_tcp(ep->fd, &ep->cfd));
950 /* Callback to check that the file descriptor is accepted. */
951 if (xcom_socket_accept_callback &&
952 !xcom_socket_accept_callback(ep->cfd, get_site_def())) {
953 shut_close_socket(&ep->cfd);
954 ep->cfd = -1;
955 }
956 if(ep->cfd == -1){
957 G_DEBUG("accept failed");
958 ep->refused = 1;
959 TASK_DELAY(0.1);
960 } else {
961 ep->refused = 0;
962 DBGOUT(FN; NDBG(ep->cfd, d););
963 task_new(acceptor_learner_task, int_arg(ep->cfd), "acceptor_learner_task",
964 XCOM_THREAD_DEBUG);
965 }
966 } while (!xcom_shutdown && (ep->cfd >= 0 || ep->refused));
967 FINALLY
968 assert(ep->fd >= 0);
969 shut_close_socket(&ep->fd);
970 TASK_END;
971 }
972
973 #ifdef XCOM_HAVE_OPENSSL
974 #define SSL_CONNECT(con, hostname) { \
975 con.ssl_fd = SSL_new(client_ctx); \
976 SSL_set_fd(con.ssl_fd, con.fd); \
977 ERR_clear_error(); \
978 ret.val = SSL_connect(con.ssl_fd); \
979 ret.funerr = to_ssl_err(SSL_get_error(con.ssl_fd, ret.val)); \
980 while (ret.val != SSL_SUCCESS && can_retry(ret.funerr)) { \
981 if (from_ssl_err(ret.funerr) == SSL_ERROR_WANT_READ){ \
982 wait_io(stack, con.fd, 'r'); \
983 }else if (from_ssl_err(ret.funerr) == SSL_ERROR_WANT_WRITE){ \
984 wait_io(stack, con.fd, 'w'); \
985 }else{ \
986 break; \
987 } \
988 TASK_YIELD; \
989 SET_OS_ERR(0); \
990 if (con.fd < 0) { \
991 ssl_free_con(&con); \
992 close_connection(&con); \
993 TERMINATE; \
994 } \
995 \
996 ERR_clear_error(); \
997 ret.val = SSL_connect(con.ssl_fd); \
998 ret.funerr = to_ssl_err(SSL_get_error(con.ssl_fd, ret.val)); \
999 } \
1000 \
1001 if (ret.val != SSL_SUCCESS) { \
1002 ssl_free_con(&con); \
1003 close_connection(&con); \
1004 TERMINATE; \
1005 }else{ \
1006 if (ssl_verify_server_cert(con.ssl_fd, hostname)) \
1007 { \
1008 ssl_free_con(&con); \
1009 close_connection(&con); \
1010 TERMINATE; \
1011 } \
1012 set_connected(&con, CON_FD); \
1013 } \
1014 }
1015 #endif
1016
server_detected(server * s)1017 void server_detected(server *s) { s->detected = task_now(); }
1018
1019 /* Try to connect to another node */
dial(server * s)1020 static int dial(server *s)
1021 {
1022 DECL_ENV
1023 int dummy;
1024 END_ENV;
1025
1026 TASK_BEGIN
1027 DBGOUT(FN; STRLIT(" dial "); NPUT(get_nodeno(get_site_def()), u);
1028 STRLIT(s->srv); NDBG(s->port, u));
1029 TASK_CALL(connect_tcp(s->srv, s->port, &s->con.fd));
1030 /* DBGOUT(FN; NDBG(s->con.fd,d);); */
1031 if (s->con.fd < 0) {
1032 DBGOUT(FN; STRLIT("could not dial "); STRLIT(s->srv); NDBG(s->port, u); );
1033 } else {
1034 if (NAGLE == 0) {
1035 set_nodelay(s->con.fd);
1036 }
1037
1038 unblock_fd(s->con.fd);
1039 #ifdef XCOM_HAVE_OPENSSL
1040 if (xcom_use_ssl()) {
1041 result ret = {0,0};
1042 SSL_CONNECT(s->con, s->srv);
1043 }
1044 #endif
1045 DBGOUT(FN; STRLIT("connected to "); STRLIT(s->srv); NDBG(s->con.fd, d);
1046 NDBG(s->port, u));
1047 set_connected(&s->con, CON_FD);
1048 alive(s);
1049 }
1050 FINALLY
1051 TASK_END;
1052 }
1053
1054
1055 /* Send message by putting it in the server queue */
send_msg(server * s,node_no from,node_no to,uint32_t group_id,pax_msg * p)1056 int send_msg(server *s, node_no from, node_no to, uint32_t group_id, pax_msg *p)
1057 {
1058 assert(p);
1059 assert(s);
1060 {
1061 msg_link * link = msg_link_new(p, to);
1062 alive(s); /* Note activity */
1063 MAY_DBG(FN; PTREXP(&s->outgoing);
1064 COPY_AND_FREE_GOUT(dbg_msg_link(link));
1065 );
1066 p->from = from;
1067 p->to = to;
1068 p->group_id = group_id;
1069 p->max_synode = get_max_synode();
1070 p->delivered_msg = get_delivered_msg();
1071 MAY_DBG(FN; PTREXP(p); STREXP(s->srv); NDBG(p->from, d); NDBG(p->to, d); NDBG(p->group_id, u));
1072 channel_put(&s->outgoing, &link->l);
1073 }
1074 return 0;
1075 }
1076
1077
_send_server_msg(site_def const * s,node_no to,pax_msg * p)1078 static inline int _send_server_msg(site_def const *s, node_no to, pax_msg *p)
1079 {
1080 assert(s);
1081 assert(s->servers[to]);
1082 if (s->servers[to] && s->servers[to]->invalid == 0 && p) {
1083 send_msg(s->servers[to], s->nodeno, to, get_group_id(s), p);
1084 }
1085 return 0;
1086 }
1087
1088
send_server_msg(site_def const * s,node_no to,pax_msg * p)1089 int send_server_msg(site_def const *s, node_no to, pax_msg *p)
1090 {
1091 return _send_server_msg(s, to, p);
1092 }
1093
send_loop(site_def const * s,node_no max,pax_msg * p,const char * dbg MY_ATTRIBUTE ((unused)))1094 static inline int send_loop(site_def const *s, node_no max, pax_msg *p, const char *dbg MY_ATTRIBUTE((unused)))
1095 {
1096 int retval = 0;
1097 assert(s);
1098 if (s) {
1099 node_no i = 0;
1100 for (i = 0; i < max; i++) {
1101 MAY_DBG(FN; STRLIT(dbg); STRLIT(" "); NDBG(i, u); NDBG(max, u); PTREXP(p));
1102 retval = _send_server_msg(s, i, p);
1103 }
1104 }
1105 return retval;
1106 }
1107
1108
1109 /* Send to all servers in site */
send_to_all_site(site_def const * s,pax_msg * p,const char * dbg)1110 int send_to_all_site(site_def const *s, pax_msg *p, const char *dbg)
1111 {
1112 int retval = 0;
1113 retval = send_loop(s, get_maxnodes(s), p, dbg);
1114 return retval;
1115 }
1116
1117 /* Send to all servers */
send_to_all(pax_msg * p,const char * dbg)1118 int send_to_all(pax_msg *p, const char *dbg)
1119 {
1120 return send_to_all_site(find_site_def(p->synode), p, dbg);
1121 }
1122
1123
send_other_loop(site_def const * s,pax_msg * p,const char * dbg MY_ATTRIBUTE ((unused)))1124 static inline int send_other_loop(site_def const *s, pax_msg *p, const char *dbg MY_ATTRIBUTE((unused)))
1125 {
1126 int retval = 0;
1127 node_no i = 0;
1128 #ifdef MAXACCEPT
1129 node_no max = MIN(get_maxnodes(s), MAXACCEPT);
1130 #else
1131 node_no max;
1132 assert(s);
1133 max = get_maxnodes(s);
1134 #endif
1135 for (i = 0; i < max; i++) {
1136 if (i != s->nodeno) {
1137 MAY_DBG(FN; STRLIT(dbg); STRLIT(" "); NDBG(i, u); NDBG(max, u); PTREXP(p));
1138 retval = _send_server_msg(s, i, p);
1139 }
1140 }
1141 return retval;
1142 }
1143
1144
1145 /* Send to other servers */
send_to_others(site_def const * s,pax_msg * p,const char * dbg)1146 int send_to_others(site_def const *s, pax_msg *p, const char *dbg)
1147 {
1148 int retval = 0;
1149 retval = send_other_loop(s, p, dbg);
1150 return retval;
1151 }
1152
1153 /* Send to some other live server, round robin */
send_to_someone(site_def const * s,pax_msg * p,const char * dbg MY_ATTRIBUTE ((unused)))1154 int send_to_someone(site_def const *s, pax_msg *p, const char *dbg MY_ATTRIBUTE((unused)))
1155 {
1156 int retval = 0;
1157 static node_no i = 0;
1158 node_no prev = 0;
1159 #ifdef MAXACCEPT
1160 node_no max = MIN(get_maxnodes(s), MAXACCEPT);
1161 #else
1162 node_no max;
1163 assert(s);
1164 max = get_maxnodes(s);
1165 #endif
1166 /* DBGOUT(FN; NDBG(max,u); NDBG(s->maxnodes,u)); */
1167 assert(max > 0);
1168 prev = i % max;
1169 i = (i + 1) % max;
1170 while (i != prev) {
1171 /* DBGOUT(FN; NDBG(i,u); NDBG(prev,u)); */
1172 if (i != s->nodeno && !may_be_dead(s->detected, i, task_now())) {
1173 MAY_DBG(FN; STRLIT(dbg); NDBG(i, u); NDBG(max, u); PTREXP(p));
1174 retval = _send_server_msg(s, i, p);
1175 break;
1176 }
1177 i = (i + 1) % max;
1178 }
1179 return retval;
1180 }
1181
1182
1183 #ifdef MAXACCEPT
1184 /* Send to all acceptors */
send_to_acceptors(pax_msg * p,const char * dbg)1185 int send_to_acceptors(pax_msg *p, const char *dbg)
1186 {
1187 site_def const *s = find_site_def(p->synode);
1188 int retval = 0;
1189 int i;
1190 retval = send_loop(s, MIN(MAXACCEPT, s->maxnodes), p, dbg);
1191 return retval;
1192 }
1193
1194
1195 #else
1196 /* Send to all acceptors */
send_to_acceptors(pax_msg * p,const char * dbg)1197 int send_to_acceptors(pax_msg *p, const char *dbg)
1198 {
1199 return send_to_all(p, dbg);
1200 }
1201
1202
1203 #endif
1204
1205 /* Used by :/int.*read_msg */
1206 /**
1207 Reads n bytes from connection rfd without buffering reads.
1208
1209 @param[in] rfd Pointer to open connection.
1210 @param[out] p Output buffer.
1211 @param[in] n Number of bytes to read.
1212 @param[out] s Pointer to server.
1213 @param[out] ret Number of bytes read, or -1 if failure.
1214
1215 @return
1216 @retval 0 if task should terminate.
1217 @retval 1 if it should continue.
1218 */
read_bytes(connection_descriptor const * rfd,char * p,uint32_t n,server * s,int64_t * ret)1219 static int read_bytes(connection_descriptor const * rfd, char *p, uint32_t n,
1220 server *s, int64_t *ret)
1221 {
1222 DECL_ENV
1223 uint32_t left;
1224 char *bytes;
1225 END_ENV;
1226
1227 int64_t nread = 0;
1228
1229 TASK_BEGIN
1230
1231 ep->left = n;
1232 ep->bytes = (char *)p;
1233 while (ep->left > 0) {
1234 MAY_DBG(FN; NDBG(rfd->fd, d); NDBG(nread, ll); NDBG(ep->left, u));
1235 TASK_CALL(task_read(rfd, ep->bytes,
1236 ep->left >= INT_MAX ? INT_MAX : (int)ep->left,
1237 &nread));
1238 MAY_DBG(FN; NDBG(rfd->fd, d); NDBG(nread, ll); NDBG(ep->left, u));
1239 if (nread == 0) {
1240 TASK_RETURN(0);
1241 } else if (nread < 0) {
1242 DBGOUT(FN; NDBG(nread, ll));
1243 TASK_FAIL;
1244 } else {
1245 ep->bytes += nread;
1246 ep->left -= (uint32_t)nread;
1247 if (s) server_detected(s);
1248 }
1249 }
1250 assert(ep->left == 0);
1251 TASK_RETURN(n);
1252 FINALLY
1253 TASK_END;
1254 }
1255
1256 /**
1257 Reads n bytes from connection rfd with buffering reads.
1258
1259 @param[in] rfd Pointer to open connection.
1260 @param[in,out] buf Used for buffering reads.
1261 Originally initialized by caller, maintained by buffered_read_bytes.
1262 @param[out] p Output buffer.
1263 @param[in] n Number of bytes to read
1264 @param[out] s Pointer to server.
1265 @param[out] ret Number of bytes read, or -1 if failure.
1266
1267 @return
1268 @retval 0 if task should terminate.
1269 @retval 1 if it should continue.
1270 */
buffered_read_bytes(connection_descriptor const * rfd,srv_buf * buf,char * p,uint32_t n,server * s,int64_t * ret)1271 static int buffered_read_bytes(connection_descriptor const * rfd, srv_buf *buf, char *p, uint32_t n, server *s, int64_t *ret)
1272 {
1273 DECL_ENV
1274 uint32_t left;
1275 char *bytes;
1276 END_ENV;
1277 uint32_t nget = 0;
1278
1279 TASK_BEGIN
1280 ep->left = n;
1281 ep->bytes = (char *)p;
1282
1283 /* First, try to get bytes from buffer */
1284 nget = get_srv_buf(buf, ep->bytes, n);
1285 ep->bytes += nget;
1286 ep->left -= nget;
1287
1288 if(ep->left >= srv_buf_capacity(buf)){
1289 /* Too big, do direct read of rest */
1290 TASK_CALL(read_bytes(rfd, ep->bytes, ep->left, s, ret));
1291 if(*ret <= 0){
1292 TASK_FAIL;
1293 }
1294 ep->left -= (uint32_t)(*ret);
1295 }else{
1296 /* Buffered read makes sense */
1297 while(ep->left > 0){
1298 int64_t nread = 0;
1299 /* Buffer is empty, reset and read */
1300 reset_srv_buf(buf);
1301 MAY_DBG(FN; NDBG(rfd->fd, d); NDBG(nread, d););
1302
1303 TASK_CALL(task_read(rfd, srv_buf_insert_ptr(buf),
1304 (int)srv_buf_free_space(buf), &nread));
1305 MAY_DBG(FN; NDBG(rfd->fd, d); NDBG(nread, d););
1306 if (nread == 0) {
1307 TASK_RETURN(0);
1308 } else if (nread < 0) {
1309 DBGOUT(FN; NDBG(nread, d));
1310 TASK_FAIL;
1311 }else{
1312 /* Update buffer to reflect number of bytes read */
1313 advance_insert_ptr(buf, (uint)nread);
1314 nget = get_srv_buf(buf, ep->bytes, ep->left);
1315 ep->bytes += nget;
1316 ep->left -= nget;
1317 if (s) server_detected(s);
1318 }
1319 }
1320 }
1321 assert(ep->left == 0);
1322 TASK_RETURN(n);
1323 FINALLY
1324 TASK_END;
1325 }
1326
get_header_1_0(unsigned char header_buf[],uint32_t * msgsize,x_msg_type * x_type,unsigned int * tag)1327 void get_header_1_0(unsigned char header_buf[], uint32_t *msgsize,
1328 x_msg_type *x_type, unsigned int *tag)
1329 {
1330 *msgsize = get_32(LENGTH_PTR(header_buf));
1331 *x_type = header_buf[X_TYPE];
1332 *tag = get_16(X_TAG_PTR(header_buf));
1333 }
1334
put_header_1_0(unsigned char header_buf[],uint32_t msgsize,x_msg_type x_type,unsigned int tag)1335 void put_header_1_0(unsigned char header_buf[], uint32_t msgsize,
1336 x_msg_type x_type, unsigned int tag)
1337 {
1338 put_32(LENGTH_PTR(header_buf), msgsize);
1339 header_buf[X_TYPE] = x_type;
1340 put_16(X_TAG_PTR(header_buf), tag);
1341 }
1342
1343 /* See also :/static .*read_bytes */
read_msg(connection_descriptor * rfd,pax_msg * p,server * s,int64_t * ret)1344 int read_msg(connection_descriptor *rfd, pax_msg *p, server *s, int64_t *ret)
1345 {
1346 int deserialize_ok = 0;
1347
1348 DECL_ENV
1349 int64_t n;
1350 char *bytes;
1351 unsigned char header_buf[MSG_HDR_SIZE];
1352 xcom_proto x_version;
1353 uint32_t msgsize;
1354 x_msg_type x_type;
1355 unsigned int tag;
1356 END_ENV;
1357
1358 TASK_BEGIN
1359 do{
1360 ep->bytes = NULL;
1361 /* Read length field, protocol version, and checksum */
1362 ep->n = 0;
1363 TASK_CALL(read_bytes(rfd, (char*)ep->header_buf, MSG_HDR_SIZE, s, &ep->n));
1364
1365 if (ep->n != MSG_HDR_SIZE) {
1366 DBGOUT(FN; NDBG(ep->n, u));
1367 TASK_FAIL;
1368 }
1369
1370 /* Check the protocol version before doing anything else */
1371 ep->x_version = read_protoversion(VERS_PTR(ep->header_buf));
1372 get_header_1_0(ep->header_buf, &ep->msgsize, & ep->x_type, &ep->tag);
1373 if(ep->x_type == x_version_req){
1374 /* Negotiation request. See what we can offer */
1375 rfd->x_proto = negotiate_protocol(ep->x_version);
1376 DBGOUT(STRLIT("incoming connection will use protcol version ");
1377 NDBG(rfd->x_proto,u); STRLIT(xcom_proto_to_str(rfd->x_proto)));
1378 ADD_EVENTS(
1379 add_event(string_arg("incoming connection will use protcol version"));
1380 add_event(string_arg(xcom_proto_to_str(rfd->x_proto)));
1381 );
1382 if(rfd->x_proto > my_xcom_version)
1383 TASK_FAIL;
1384 set_connected(rfd, CON_PROTO);
1385 TASK_CALL(send_proto(rfd, rfd->x_proto, x_version_reply, ep->tag, ret));
1386 } else if (ep->x_type == x_version_reply){
1387 /* Mark connection with negotiated protocol version */
1388 if(rfd->snd_tag == ep->tag){
1389 rfd->x_proto = ep->x_version;
1390 DBGOUT(STRLIT("peer connection will use protcol version ");
1391 NDBG(rfd->x_proto,u); STRLIT(xcom_proto_to_str(rfd->x_proto)));
1392
1393 ADD_EVENTS(
1394 add_event(string_arg("peer connection will use protcol version"));
1395 add_event(string_arg( xcom_proto_to_str(rfd->x_proto)));
1396 );
1397 if(rfd->x_proto > my_xcom_version || rfd->x_proto == x_unknown_proto)
1398 TASK_FAIL;
1399
1400 set_connected(rfd, CON_PROTO);
1401 }
1402 }
1403 }while(ep->x_type != x_normal);
1404
1405 #ifdef XCOM_PARANOID
1406 assert(check_protoversion(ep->x_version, rfd->x_proto));
1407 #endif
1408 if (!check_protoversion(ep->x_version, rfd->x_proto)) {
1409 TASK_FAIL;
1410 }
1411
1412 /* OK, we can grok this version */
1413
1414 /* Allocate buffer space for message */
1415 ep->bytes = calloc(1, ep->msgsize);
1416 if(!ep->bytes){
1417 TASK_FAIL;
1418 }
1419
1420 /* Read message */
1421 ep->n = 0;
1422 TASK_CALL(read_bytes(rfd, ep->bytes, ep->msgsize, s, &ep->n));
1423
1424 if (ep->n > 0) {
1425 /* Deserialize message */
1426 deserialize_ok = deserialize_msg(p, rfd->x_proto, ep->bytes, ep->msgsize);
1427 MAY_DBG(FN; STRLIT(" deserialized message"));
1428
1429 }
1430 /* Deallocate buffer */
1431 X_FREE(ep->bytes);
1432 if (ep->n <= 0 || !deserialize_ok) {
1433 DBGOUT(FN; NDBG(ep->n, d); NDBG(deserialize_ok,d));
1434 TASK_FAIL;
1435 }
1436 TASK_RETURN(ep->n);
1437 FINALLY
1438 TASK_END;
1439 }
1440
buffered_read_msg(connection_descriptor * rfd,srv_buf * buf,pax_msg * p,server * s,int64_t * ret)1441 int buffered_read_msg(connection_descriptor *rfd, srv_buf *buf,
1442 pax_msg *p, server *s, int64_t *ret)
1443 {
1444 int deserialize_ok = 0;
1445
1446 DECL_ENV
1447 int64_t n;
1448 char *bytes;
1449 unsigned char header_buf[MSG_HDR_SIZE];
1450 xcom_proto x_version;
1451 uint32_t msgsize;
1452 x_msg_type x_type;
1453 unsigned int tag;
1454 #ifdef NOTDEF
1455 unsigned int check;
1456 #endif
1457 END_ENV;
1458
1459 TASK_BEGIN
1460 do{
1461 ep->bytes = NULL;
1462 /* Read length field, protocol version, and checksum */
1463 ep->n = 0;
1464 TASK_CALL(buffered_read_bytes(rfd, buf, (char*)ep->header_buf, MSG_HDR_SIZE, s, &ep->n));
1465
1466 if (ep->n != MSG_HDR_SIZE) {
1467 DBGOUT(FN; NDBG(ep->n, u));
1468 TASK_FAIL;
1469 }
1470
1471 /* Check the protocol version before doing anything else */
1472 ep->x_version = read_protoversion(VERS_PTR(ep->header_buf));
1473 get_header_1_0(ep->header_buf, &ep->msgsize, & ep->x_type, &ep->tag);
1474 if(ep->x_type == x_version_req){
1475 /* Negotiation request. See what we can offer */
1476 rfd->x_proto = negotiate_protocol(ep->x_version);
1477 DBGOUT(STRLIT("incoming connection will use protcol version ");
1478 NDBG(rfd->x_proto,u); STRLIT(xcom_proto_to_str(rfd->x_proto)));
1479 ADD_EVENTS(
1480 add_event(string_arg("incoming connection will use protcol version"));
1481 add_event(string_arg( xcom_proto_to_str(rfd->x_proto)));
1482 );
1483 if(rfd->x_proto > my_xcom_version)
1484 TASK_FAIL;
1485 set_connected(rfd, CON_PROTO);
1486 TASK_CALL(send_proto(rfd, rfd->x_proto, x_version_reply, ep->tag, ret));
1487 } else if (ep->x_type == x_version_reply){
1488 /* Mark connection with negotiated protocol version */
1489 if(rfd->snd_tag == ep->tag){
1490 rfd->x_proto = ep->x_version;
1491 DBGOUT(STRLIT("peer connection will use protcol version ");
1492 NDBG(rfd->x_proto,u); STRLIT(xcom_proto_to_str(rfd->x_proto)));
1493 ADD_EVENTS(
1494 add_event(string_arg("peer connection will use protcol version"));
1495 add_event(string_arg( xcom_proto_to_str(rfd->x_proto)));
1496 );
1497 if(rfd->x_proto > my_xcom_version || rfd->x_proto == x_unknown_proto)
1498 TASK_FAIL;
1499
1500 set_connected(rfd, CON_PROTO);
1501 }
1502 }
1503 }while(ep->x_type != x_normal);
1504
1505 #ifdef XCOM_PARANOID
1506 assert(check_protoversion(ep->x_version, rfd->x_proto));
1507 #endif
1508 if (!check_protoversion(ep->x_version, rfd->x_proto)) {
1509 TASK_FAIL;
1510 }
1511
1512 /* OK, we can grok this version */
1513
1514 /* Allocate buffer space for message */
1515 ep->bytes = calloc(1, ep->msgsize);
1516 if(!ep->bytes){
1517 TASK_FAIL;
1518 }
1519 /* Read message */
1520 ep->n = 0;
1521 TASK_CALL(buffered_read_bytes(rfd, buf, ep->bytes, ep->msgsize, s, &ep->n));
1522
1523 if (ep->n > 0) {
1524 /* Deserialize message */
1525 deserialize_ok = deserialize_msg(p, rfd->x_proto, ep->bytes, ep->msgsize);
1526 MAY_DBG(FN; STRLIT(" deserialized message"));
1527
1528 }
1529 /* Deallocate buffer */
1530 X_FREE(ep->bytes);
1531 if (ep->n <= 0 || !deserialize_ok) {
1532 DBGOUT(FN; NDBG(ep->n, d); NDBG(deserialize_ok,d));
1533 TASK_FAIL;
1534 }
1535 TASK_RETURN(ep->n);
1536 FINALLY
1537 TASK_END;
1538 }
1539
recv_proto(connection_descriptor const * rfd,xcom_proto * x_proto,x_msg_type * x_type,unsigned int * tag,int64_t * ret)1540 int recv_proto(connection_descriptor const * rfd, xcom_proto *x_proto,
1541 x_msg_type *x_type, unsigned int *tag, int64_t *ret)
1542 {
1543 DECL_ENV
1544 int64_t n;
1545 unsigned char header_buf[MSG_HDR_SIZE];
1546 uint32_t msgsize;
1547 END_ENV;
1548
1549 TASK_BEGIN
1550
1551 /* Read length field, protocol version, and checksum */
1552 ep->n = 0;
1553 TASK_CALL(read_bytes(rfd, (char*)ep->header_buf, MSG_HDR_SIZE, 0, &ep->n));
1554
1555 if (ep->n != MSG_HDR_SIZE) {
1556 DBGOUT(FN; NDBG(ep->n, ll));
1557 TASK_FAIL;
1558 }
1559
1560 *x_proto = read_protoversion(VERS_PTR(ep->header_buf));
1561 get_header_1_0(ep->header_buf, &ep->msgsize, x_type, tag);
1562 TASK_RETURN(ep->n);
1563 FINALLY
1564 TASK_END;
1565 }
1566
1567
1568 /* }}} */
1569
1570 /* {{{ Sender task */
1571 /* purecov: begin deadcode */
tag_check(unsigned int tag1,unsigned int tag2)1572 inline int tag_check(unsigned int tag1, unsigned int tag2)
1573 {
1574 return (tag1 & 0xffff) == (tag2 & 0xffff);
1575 }
1576 /* purecov: end */
incr_tag(unsigned int tag)1577 static inline unsigned int incr_tag(unsigned int tag)
1578 {
1579 ++tag;
1580 return tag & 0xffff;
1581 }
1582
start_protocol_negotiation(channel * outgoing)1583 static void start_protocol_negotiation(channel *outgoing)
1584 {
1585 msg_link * link = msg_link_new(0, VOID_NODE_NO);
1586 MAY_DBG(FN; PTREXP(outgoing);
1587 COPY_AND_FREE_GOUT(dbg_msg_link(link));
1588 );
1589 channel_put_front(outgoing, &link->l);
1590 }
1591
1592 #define TAG_START 313
1593
1594 /* Fetch messages from queue and send to other server. Having a
1595 separate queue and task for doing this simplifies the logic since we
1596 never need to wait to send. */
sender_task(task_arg arg)1597 int sender_task(task_arg arg)
1598 {
1599 DECL_ENV
1600 server * s;
1601 msg_link * link;
1602 unsigned int tag;
1603 END_ENV;
1604
1605 TASK_BEGIN
1606
1607 ep->s = (server * )get_void_arg(arg);
1608 ep->link = NULL;
1609 ep->tag = TAG_START;
1610 srv_ref(ep->s);
1611
1612 for(;;) {
1613 /* Loop until connected */
1614 while (!is_connected(&ep->s->con)) {
1615 TASK_CALL(dial(ep->s));
1616 if (ep->s->con.fd < 0) {
1617 TASK_DELAY(1.000);
1618 }
1619 empty_msg_channel(&ep->s->outgoing);
1620 }
1621
1622 reset_srv_buf(&ep->s->out_buf);
1623
1624 /* We are ready to start sending messages.
1625 Insert a message in the input queue to negotiate the protocol.
1626 */
1627 start_protocol_negotiation(&ep->s->outgoing);
1628 while (is_connected(&ep->s->con)) {
1629
1630 int64_t ret;
1631 assert(!ep->link);
1632 if (0 && link_empty(&ep->s->outgoing.data)) {
1633 TASK_DELAY(0.1 * my_drand48());
1634 }
1635 /* FWD_ITER(&ep->s->outgoing.data, msg_link,
1636 DBGOUT(FN; PTREXP(link_iter));
1637 );
1638 */
1639 if (link_empty(&ep->s->outgoing.data)) {
1640 TASK_CALL(flush_srv_buf(ep->s, &ret));
1641 }
1642 CHANNEL_GET(&ep->s->outgoing, &ep->link, msg_link);
1643 {
1644 int64_t ret;
1645 /* DBGOUT(FN; PTREXP(stack); PTREXP(ep->link)); */
1646 MAY_DBG(FN; PTREXP(&ep->s->outgoing);
1647 COPY_AND_FREE_GOUT(dbg_msg_link(ep->link));
1648 );
1649 MAY_DBG(FN; STRLIT(" extracted ");
1650 COPY_AND_FREE_GOUT(dbg_linkage(&ep->link->l));
1651 );
1652
1653 /* If ep->link->p is 0, it is a protocol (re)negotiation request */
1654 if(ep->link->p){
1655 ADD_EVENTS(
1656 add_event(string_arg("sending ep->link->p->synode"));
1657 add_synode_event(ep->link->p->synode);
1658 add_event(string_arg("to"));
1659 add_event(uint_arg(ep->link->p->to));
1660 add_event(string_arg(pax_op_to_str(ep->link->p->op)));
1661 );
1662 TASK_CALL(_send_msg(ep->s, ep->link->p, ep->link->to, &ret));
1663 if(ret < 0){
1664 goto next;
1665 }
1666 ADD_EVENTS(
1667 add_event(string_arg("sent ep->link->p->synode"));
1668 add_synode_event(ep->link->p->synode);
1669 add_event(string_arg("to"));
1670 add_event(uint_arg(ep->link->p->to));
1671 add_event(string_arg(pax_op_to_str(ep->link->p->op)));
1672 );
1673 } else {
1674 set_connected(&ep->s->con, CON_FD);
1675 /* Send protocol negotiation request */
1676 do{
1677 TASK_CALL(send_proto(&ep->s->con, my_xcom_version, x_version_req, ep->tag, &ret));
1678 if(!is_connected(&ep->s->con)){
1679 goto next;
1680 }
1681 ep->tag = incr_tag(ep->tag);
1682 }while(ret < 0);
1683 G_DEBUG("sent negotiation request for protocol %d",my_xcom_version);
1684 ADD_EVENTS(
1685 add_event(string_arg("sent negotiation request for protocol"));
1686 add_event(string_arg( xcom_proto_to_str(my_xcom_version)));
1687 );
1688
1689 /* Wait until negotiation done.
1690 reply_handler_task will catch reply and change state */
1691 while(!proto_done(&ep->s->con)){
1692 TASK_DELAY(0.1);
1693 if(!is_connected(&ep->s->con)){
1694 goto next;
1695 }
1696 }
1697 G_DEBUG("will use protocol %d",ep->s->con.x_proto);
1698 ADD_EVENTS(
1699 add_event(string_arg("will use protocol"));
1700 add_event(string_arg( xcom_proto_to_str(ep->s->con.x_proto)));
1701 );
1702
1703 }
1704 }
1705 next:
1706 msg_link_delete(&ep->link);
1707 /* TASK_YIELD; */
1708 }
1709 }
1710 FINALLY
1711 empty_msg_channel(&ep->s->outgoing);
1712 ep->s->sender = NULL;
1713 srv_unref(ep->s);
1714 if (ep->link)
1715 msg_link_delete(&ep->link);
1716 TASK_END;
1717 }
1718
1719 /* Fetch messages from queue and send to self.
1720 Having a separate mechanism for internal communication
1721 avoids SSL blocking when trying to connect to same thread. */
local_sender_task(task_arg arg)1722 int local_sender_task(task_arg arg)
1723 {
1724 DECL_ENV
1725 server * s;
1726 msg_link * link;
1727 END_ENV;
1728
1729 TASK_BEGIN
1730
1731 ep->s = (server * )get_void_arg(arg);
1732 ep->link = NULL;
1733 srv_ref(ep->s);
1734
1735 reset_srv_buf(&ep->s->out_buf);
1736
1737 while (!xcom_shutdown) {
1738
1739 assert(!ep->link);
1740 CHANNEL_GET(&ep->s->outgoing, &ep->link, msg_link);
1741 {
1742 /* DBGOUT(FN; PTREXP(stack); PTREXP(ep->link)); */
1743 MAY_DBG(FN; PTREXP(&ep->s->outgoing);
1744 COPY_AND_FREE_GOUT(dbg_msg_link(ep->link));
1745 );
1746 MAY_DBG(FN; STRLIT(" extracted ");
1747 COPY_AND_FREE_GOUT(dbg_linkage(&ep->link->l));
1748 );
1749 assert(ep->link->p);
1750 ep->link->p->to = ep->link->p->from;
1751 dispatch_op(find_site_def(ep->link->p->synode), ep->link->p, NULL);
1752 }
1753 msg_link_delete(&ep->link);
1754 }
1755 FINALLY
1756 empty_msg_channel(&ep->s->outgoing);
1757 ep->s->sender = NULL;
1758 srv_unref(ep->s);
1759 if (ep->link)
1760 msg_link_delete(&ep->link);
1761 TASK_END;
1762 }
1763
1764
1765 /* }}} */
1766
end_token(char * a)1767 static int end_token(char *a)
1768 {
1769 int i = 0;
1770 while (a[i] != 0 && a[i] != ':') {
1771 i++;
1772 }
1773 return(i);
1774 }
1775
1776
token_copy(char * a,int i)1777 static char *token_copy(char *a, int i)
1778 {
1779 char *ret;
1780 ret = calloc(1, (size_t)(i + 1));
1781 if(!ret)
1782 return ret;
1783 ret[i--] = 0;
1784 while (i >= 0) {
1785 ret[i] = a[i];
1786 i--;
1787 }
1788 return ret;
1789 }
1790
1791
1792 /* Get host name from host:port string */
get_name(char * a)1793 static char *get_name(char *a)
1794 {
1795 int i = end_token(a);
1796 return token_copy(a, i);
1797 }
1798
1799 /* Get host name from host:port string */
xcom_get_name(char * a)1800 char *xcom_get_name(char *a)
1801 {
1802 return get_name(a);
1803 }
1804
1805
1806 /* Get port from host:port string */
get_port(char * a)1807 static xcom_port get_port(char *a)
1808 {
1809 int i = end_token(a);
1810 if (a[i] != 0 ) {
1811 int port = atoi(a+i+1);
1812 if (number_is_valid_port(port))
1813 return (xcom_port) port;
1814 }
1815 return 0;
1816 }
1817
1818
xcom_get_port(char * a)1819 xcom_port xcom_get_port(char *a)
1820 {
1821 return a ? get_port(a): 0;
1822 }
1823
1824
find_server(server * table[],int n,char * name,xcom_port port)1825 static server *find_server(server *table[], int n, char *name, xcom_port port)
1826 {
1827 int i;
1828 for (i = 0; i < n; i++) {
1829 server * s = table[i];
1830 if (s && strcmp(s->srv, name) == 0 && s->port == port) /* FIXME should use IP address */
1831 return s;
1832 }
1833 return 0;
1834 }
1835
1836
update_servers(site_def * s,cargo_type operation)1837 void update_servers(site_def *s, cargo_type operation)
1838 {
1839 u_int n;
1840
1841 if (s) {
1842 u_int i = 0;
1843 n = s->nodes.node_list_len;
1844
1845 DBGOUT(FN; NDBG(get_maxnodes(s), u); NDBG(n, d); PTREXP(s));
1846
1847 for (i = 0; i < n; i++) {
1848 char *addr = s->nodes.node_list_val[i].address;
1849 char *name = get_name(addr);
1850 xcom_port port = get_port(addr);
1851 server * sp = find_server(all_servers, maxservers, name, port);
1852
1853 if (sp) {
1854 DBGOUT(FN; STRLIT("re-using server "); NDBG(i, d); STREXP(name));
1855 free(name);
1856 s->servers[i] = sp;
1857 if(sp->invalid)
1858 sp->invalid= 0;
1859 } else { /* No server? Create one */
1860 DBGOUT(FN; STRLIT("creating new server "); NDBG(i, d); STREXP(name));
1861 if (port > 0)
1862 s->servers[i] = addsrv(name, port);
1863 else
1864 s->servers[i] = addsrv(name, xcom_listen_port);
1865 }
1866 }
1867 /* Zero the rest */
1868 for (i = n; i < NSERVERS; i++) {
1869 s->servers[i] = 0;
1870 }
1871 /*
1872 If we have a force config, mark the servers that do not belong to this
1873 configuration as invalid
1874 */
1875
1876 if(operation == force_config_type) {
1877 const site_def* old_site_def= get_prev_site_def();
1878 invalidate_servers(old_site_def, s);
1879 }
1880
1881 }
1882 }
1883
1884
1885 /*
1886 Make a diff between 2 site_defs and mark as invalid servers
1887 that do not belong to the new site_def.
1888 This is only to be used if we are forcing a configuration.
1889 */
invalidate_servers(const site_def * old_site_def,const site_def * new_site_def)1890 void invalidate_servers(const site_def* old_site_def,
1891 const site_def* new_site_def) {
1892 u_int node= 0;
1893 for(; node < get_maxnodes(old_site_def); node++){
1894 node_address* node_addr_from_old_site_def= &old_site_def->nodes.node_list_val[node];
1895 if(!node_exists(node_addr_from_old_site_def, &new_site_def->nodes))
1896 {
1897 char *addr = node_addr_from_old_site_def->address;
1898 char *name = get_name(addr);
1899 xcom_port port = get_port(addr);
1900 server *sp = find_server(all_servers, maxservers, name, port);
1901 if (sp) {
1902 sp->invalid= 1;
1903 }
1904 free(name);
1905 }
1906 }
1907 }
1908
1909
1910 /* Remove tcp connections which seem to be idle */
tcp_reaper_task(task_arg arg MY_ATTRIBUTE ((unused)))1911 int tcp_reaper_task(task_arg arg MY_ATTRIBUTE((unused)))
1912 {
1913 DECL_ENV
1914 int dummy;
1915 END_ENV;
1916 TASK_BEGIN
1917 while (!xcom_shutdown) {
1918 int i;
1919 double now = task_now();
1920 for (i = 0; i < maxservers; i++) {
1921 server * s = all_servers[i];
1922 if (s && s->con.fd != -1 && (s->active + 10.0) < now) {
1923 shutdown_connection(&s->con);
1924 }
1925 }
1926 TASK_DELAY(1.0);
1927 }
1928 FINALLY
1929 TASK_END;
1930 }
1931
1932
1933 #define TERMINATE_CLIENT(ep) { \
1934 if (ep->s->crash_on_error) \
1935 abort(); \
1936 TERMINATE; \
1937 }
1938
1939
1940 /*
1941 One-shot task to send a message to any xcom node via the client interface.
1942 The sender need not be part of any group.
1943 Any tcp connection may be used, as long as the message is a pax_msg
1944 serialized with serialize_msg. Doing it this way is simply the most
1945 convenient way of sending something to a specific address/port without blocking
1946 the task system. Error handling is very rudimentary.
1947 */
1948 /* purecov: begin deadcode */
1949 /* Try to connect to another node */
client_dial(char * srv,xcom_port port,connection_descriptor * con)1950 static int client_dial(char *srv, xcom_port port, connection_descriptor *con)
1951 {
1952 DECL_ENV
1953 int dummy;
1954 END_ENV;
1955
1956 TASK_BEGIN
1957 DBGOUT(FN; STRLIT(" dial "); NPUT(get_nodeno(get_site_def()), u); STRLIT(srv); NDBG(port, d));
1958 TASK_CALL(connect_tcp(srv, port, &con->fd));
1959 /* DBGOUT(FN; NDBG(con->fd,d);); */
1960 if (con->fd < 0) {
1961 DBGOUT(FN; STRLIT("could not dial "); STRLIT(srv); NDBG(port, d); );
1962 } else {
1963 if (NAGLE == 0) {
1964 set_nodelay(con->fd);
1965 }
1966
1967 unblock_fd(con->fd);
1968 #ifdef XCOM_HAVE_OPENSSL
1969 if (xcom_use_ssl()) {
1970 result ret = {0,0};
1971 SSL_CONNECT((*con), srv);
1972 }
1973 #endif
1974 DBGOUT(FN; STRLIT("connected to "); STRLIT(srv); NDBG(con->fd, d); NDBG(port, d));
1975 set_connected(con, CON_FD);
1976 }
1977 FINALLY
1978 TASK_END;
1979 }
1980
1981
client_task(task_arg arg)1982 int client_task(task_arg arg)
1983 {
1984 DECL_ENV
1985 envelope * s;
1986 u_int buflen;
1987 char *buf;
1988 connection_descriptor c_descriptor;
1989 xcom_proto x_proto;
1990 x_msg_type x_type;
1991 unsigned int tag;
1992 END_ENV;
1993
1994 TASK_BEGIN
1995
1996 ep->s = (envelope * )get_void_arg(arg);
1997 ep->c_descriptor.fd = -1;
1998 #ifdef XCOM_HAVE_OPENSSL
1999 ep->c_descriptor.ssl_fd = 0;
2000 #endif
2001 ep->buf = 0;
2002 ep->x_proto = my_xcom_version;
2003
2004 /* Loop until connected */
2005 while (!is_connected(&ep->c_descriptor)) {
2006 TASK_CALL(client_dial(ep->s->srv, ep->s->port, &ep->c_descriptor));
2007 if (ep->c_descriptor.fd < 0) {
2008 TASK_DELAY(1.000);
2009 }
2010 }
2011
2012
2013 #ifdef XCOM_HAVE_OPENSSL
2014 if (xcom_use_ssl()) {
2015 result ret = {
2016 0, 0 };
2017 SSL_CONNECT(ep->c_descriptor, ep->s->srv);
2018 }
2019 #endif
2020 {
2021 int64_t sent;
2022 int64_t n;
2023 /* Send protocol negotiation request */
2024 DBGOUT(FN);
2025 TASK_CALL(send_proto(&ep->c_descriptor, my_xcom_version, x_version_req, TAG_START, &sent));
2026 if (sent < 0) {
2027 TERMINATE_CLIENT(ep);
2028 }
2029
2030 DBGOUT(FN);
2031 /* Wait for answer and read protocol version */
2032 TASK_CALL(recv_proto(&ep->c_descriptor, &ep->x_proto, &ep->x_type, &ep->tag, &n));
2033 if (n < 0) {
2034 TERMINATE_CLIENT(ep);
2035 }
2036
2037 DBGOUT(FN);
2038 if (ep->tag == TAG_START && ep->x_type == x_version_reply) {
2039 DBGOUT(STRLIT("client task will use protcol version ");
2040 NDBG(ep->x_proto, u); STRLIT(xcom_proto_to_str(ep->x_proto)));
2041 if (ep->x_proto == x_unknown_proto) {
2042 TERMINATE_CLIENT(ep);
2043 }
2044
2045 DBGOUT(FN);
2046 ep->c_descriptor.x_proto = ep->x_proto;
2047 /* Send message */
2048 serialize_msg(ep->s->p, ep->c_descriptor.x_proto, &ep->buflen, &ep->buf);
2049 if (ep->buflen) {
2050 DBGOUT(FN);
2051 TASK_CALL(task_write(&ep->c_descriptor, ep->buf, ep->buflen, &sent));
2052 if (ep->buflen != sent) {
2053 DBGOUT(FN; STRLIT("write failed "); STRLIT(ep->s->srv); NDBG(ep->s->port, d);
2054 NDBG(ep->buflen, d); NDBG(sent, d));
2055 TERMINATE_CLIENT(ep);
2056 }
2057 }
2058 } else {
2059 DBGOUT(FN);
2060 TERMINATE_CLIENT(ep);
2061 }
2062 }
2063
2064 FINALLY
2065 shutdown_connection(&ep->c_descriptor);
2066 X_FREE(ep->buf);
2067 free(ep->s->srv);
2068 XCOM_XDR_FREE(xdr_pax_msg, ep->s->p);
2069 free(ep->s);
2070 TASK_END;
2071 }
2072 /* purecov: end */
2073
2074 #ifdef XCOM_HAVE_OPENSSL
ssl_free_con(connection_descriptor * con)2075 void ssl_free_con(connection_descriptor *con)
2076 {
2077 SSL_free(con->ssl_fd);
2078 con->ssl_fd= NULL;
2079 }
2080
ssl_shutdown_con(connection_descriptor * con)2081 void ssl_shutdown_con(connection_descriptor *con)
2082 {
2083 if(con->fd >= 0 && con->ssl_fd != NULL) {
2084 SSL_shutdown(con->ssl_fd);
2085 ssl_free_con(con);
2086 }
2087 }
2088 #endif
2089
close_connection(connection_descriptor * con)2090 void close_connection(connection_descriptor *con)
2091 {
2092 shut_close_socket(&con->fd);
2093 con->fd = -1;
2094 set_connected(con, CON_NULL);
2095 }
2096
shutdown_connection(connection_descriptor * con)2097 void shutdown_connection(connection_descriptor *con)
2098 {
2099 /* printstack(1); */
2100 ADD_EVENTS(
2101 add_event(string_arg("con->fd"));
2102 add_event(int_arg(con->fd));
2103 );
2104 #ifdef XCOM_HAVE_OPENSSL
2105 ssl_shutdown_con(con);
2106 #endif
2107 close_connection(con);
2108 }
2109
reset_connection(connection_descriptor * con)2110 void reset_connection(connection_descriptor *con)
2111 {
2112 con->fd = -1;
2113 #ifdef XCOM_HAVE_OPENSSL
2114 con->ssl_fd = 0;
2115 #endif
2116 set_connected(con, CON_NULL);
2117 }
2118
2119 /* The protocol version used by the group as a whole is the minimum of the
2120 maximum protocol versions in the config. */
common_xcom_version(site_def const * site)2121 xcom_proto common_xcom_version(site_def const *site)
2122 {
2123 u_int i;
2124 xcom_proto min_proto = my_xcom_version;
2125 for(i = 0; i < site->nodes.node_list_len; i++){
2126 min_proto = MIN(min_proto, site->nodes.node_list_val[i].proto.max_proto);
2127 }
2128 return min_proto;
2129 }
2130
2131 static xcom_proto latest_common_proto = MY_XCOM_PROTO;
2132
set_latest_common_proto(xcom_proto x_proto)2133 xcom_proto set_latest_common_proto(xcom_proto x_proto)
2134 {
2135 return latest_common_proto = x_proto;
2136 }
2137
get_latest_common_proto()2138 xcom_proto get_latest_common_proto()
2139 {
2140 return latest_common_proto;
2141 }
2142
2143 /* See which protocol we can use.
2144 Needs to be redefined as the protocol changes */
negotiate_protocol(xcom_proto proto_vers)2145 xcom_proto negotiate_protocol(xcom_proto proto_vers)
2146 {
2147 /* Ensure that protocol will not be greater than
2148 my_xcom_version */
2149 if(proto_vers < my_min_xcom_version){
2150 return x_unknown_proto;
2151 }else if(proto_vers > my_xcom_version){
2152 return my_xcom_version;
2153 }else{
2154 return proto_vers;
2155 }
2156 }
2157
2158 /*
2159 Encode and decode node_address with protocol version 0.
2160 This version is frozen forever, so having a handcrafted (in reality mostly copied)
2161 xdr function here is OK.
2162 */
2163 /* purecov: begin deadcode */
xdr_node_address_with_1_0(XDR * xdrs,node_address * objp)2164 bool_t xdr_node_address_with_1_0 (XDR *xdrs, node_address *objp)
2165 {
2166 if (!xdr_string (xdrs, &objp->address, ~(u_int)0))
2167 return FALSE;
2168 if (!xdr_blob (xdrs, &objp->uuid))
2169 return FALSE;
2170 if (xdrs->x_op == XDR_DECODE) {
2171 objp->proto.min_proto = x_1_0; /* A node which speaks protocol version 0 only supports version 0 */
2172 objp->proto.max_proto = x_1_0;
2173 }
2174 return TRUE;
2175 }
2176 /* purecov: end */
2177
2178 /* Encode and decode a node_list while respecting protocol version */
xdr_node_list_1_1(XDR * xdrs,node_list_1_1 * objp)2179 bool_t xdr_node_list_1_1(XDR *xdrs, node_list_1_1 *objp)
2180 {
2181 xcom_proto vx = *((xcom_proto*)xdrs->x_public);
2182 /* Select protocol encode/decode based on the x_public field of the xdr struct */
2183 switch(vx){
2184 case x_1_0:
2185 return xdr_array (xdrs, (char **)&objp->node_list_val, (u_int *) &objp->node_list_len, NSERVERS,
2186 sizeof (node_address), (xdrproc_t) xdr_node_address_with_1_0);
2187 case x_1_1:
2188 case x_1_2:
2189 return xdr_array (xdrs, (char **)&objp->node_list_val, (u_int *) &objp->node_list_len, NSERVERS,
2190 sizeof (node_address), (xdrproc_t) xdr_node_address);
2191 default:
2192 return FALSE;
2193 }
2194 }
2195
2196 /* Encode and decode a application data with added check that there is enough data when decoding */
xdr_checked_data(XDR * xdrs,checked_data * objp)2197 bool_t xdr_checked_data(XDR *xdrs, checked_data *objp)
2198 {
2199 /*
2200 Sanity check. x_handy is number of remaining bytes. For old XDR,
2201 x_handy is int type. So type cast is used to eliminate a warning.
2202 */
2203 if(xdrs->x_op == XDR_DECODE && (objp->data_len + 4 )> (u_int)xdrs->x_handy)
2204 return FALSE;
2205 return xdr_bytes(xdrs, (char **)&objp->data_val, (u_int *) &objp->data_len, 0xffffffff);
2206 }
2207
xdr_pax_msg(XDR * xdrs,pax_msg * objp)2208 bool_t xdr_pax_msg(XDR *xdrs, pax_msg *objp)
2209 {
2210 xcom_proto vx = *((xcom_proto * )xdrs->x_public);
2211 /* Select protocol encode/decode based on the x_public field of the xdr struct */
2212 switch (vx) {
2213 case x_1_0:
2214 case x_1_1:
2215 if (!xdr_pax_msg_1_1(xdrs, (pax_msg_1_1*)objp))
2216 return FALSE;
2217 if (xdrs->x_op == XDR_DECODE)
2218 objp->delivered_msg = get_delivered_msg(); /* Use our own minimum */
2219 return TRUE;
2220 case x_1_2:
2221 return xdr_pax_msg_1_2(xdrs, objp);
2222 default:
2223 return FALSE;
2224 }
2225 }
2226
2227
2228