1 /*****************************************************************************\
2  *  slurm_persist_conn.h - Definitions for communicating over a persistent
3  *                         connection within Slurm.
4  ******************************************************************************
5  *  Copyright (C) 2016 SchedMD LLC
6  *  Written by Danny Auble da@schedmd.com, et. al.
7  *
8  *  This file is part of Slurm, a resource management program.
9  *  For details, see <https://slurm.schedmd.com/>.
10  *  Please also read the included file: DISCLAIMER.
11  *
12  *  Slurm is free software; you can redistribute it and/or modify it under
13  *  the terms of the GNU General Public License as published by the Free
14  *  Software Foundation; either version 2 of the License, or (at your option)
15  *  any later version.
16  *
17  *  In addition, as a special exception, the copyright holders give permission
18  *  to link the code of portions of this program with the OpenSSL library under
19  *  certain conditions as described in each individual source file, and
20  *  distribute linked combinations including the two. You must obey the GNU
21  *  General Public License in all respects for all of the code used other than
22  *  OpenSSL. If you modify file(s) with this exception, you may extend this
23  *  exception to your version of the file(s), but you are not obligated to do
24  *  so. If you do not wish to do so, delete this exception statement from your
25  *  version.  If you delete this exception statement from all source files in
26  *  the program, then also delete it here.
27  *
28  *  Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
29  *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
30  *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
31  *  details.
32  *
33  *  You should have received a copy of the GNU General Public License along
34  *  with Slurm; if not, write to the Free Software Foundation, Inc.,
35  *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA.
36 \*****************************************************************************/
37 
38 #include "config.h"
39 
40 #include <poll.h>
41 #include <pthread.h>
42 
43 #if HAVE_SYS_PRCTL_H
44 #include <sys/prctl.h>
45 #endif
46 
47 #include "slurm/slurm_errno.h"
48 #include "src/common/fd.h"
49 #include "src/common/macros.h"
50 #include "src/common/slurm_auth.h"
51 #include "src/common/slurm_protocol_pack.h"
52 #include "src/common/slurmdbd_defs.h"
53 #include "src/common/slurmdbd_pack.h"
54 #include "src/common/xsignal.h"
55 #include "slurm_persist_conn.h"
56 
57 #define MAX_THREAD_COUNT 100
58 
59 /*
60  *  Maximum message size. Messages larger than this value (in bytes)
61  *  will not be received.
62  */
63 #define MAX_MSG_SIZE     (16*1024*1024)
64 
65 typedef struct {
66 	void *arg;
67 	slurm_persist_conn_t *conn;
68 	int thread_loc;
69 	pthread_t thread_id;
70 } persist_service_conn_t;
71 
72 static persist_service_conn_t *persist_service_conn[MAX_THREAD_COUNT];
73 static int             thread_count = 0;
74 static pthread_mutex_t thread_count_lock = PTHREAD_MUTEX_INITIALIZER;
75 static pthread_cond_t  thread_count_cond = PTHREAD_COND_INITIALIZER;
76 static int             shutdown_time = 0;
77 
78 /* Return time in msec since "start time" */
_tot_wait(struct timeval * start_time)79 static int _tot_wait (struct timeval *start_time)
80 {
81 	struct timeval end_time;
82 	int msec_delay;
83 
84 	gettimeofday(&end_time, NULL);
85 	msec_delay =   (end_time.tv_sec  - start_time->tv_sec ) * 1000;
86 	msec_delay += ((end_time.tv_usec - start_time->tv_usec + 500) / 1000);
87 	return msec_delay;
88 }
89 
90 /* close and fd and replace it with a -1 */
_close_fd(int * fd)91 static void _close_fd(int *fd)
92 {
93 	if (*fd && *fd >= 0) {
94 		close(*fd);
95 		*fd = -1;
96 	}
97 }
98 
99 /* Return true if communication failure should be logged. Only log failures
100  * every 10 minutes to avoid filling logs */
_comm_fail_log(slurm_persist_conn_t * persist_conn)101 static bool _comm_fail_log(slurm_persist_conn_t *persist_conn)
102 {
103 	time_t now = time(NULL);
104 	time_t old = now - 600;	/* Log failures once every 10 mins */
105 
106 	if (persist_conn->comm_fail_time < old) {
107 		persist_conn->comm_fail_time = now;
108 		return true;
109 	}
110 	return false;
111 }
112 
113 /* static void _reopen_persist_conn(slurm_persist_conn_t *persist_conn) */
114 /* { */
115 /*	xassert(persist_conn); */
116 /*	_close_fd(&persist_conn->fd); */
117 /*	slurm_persist_conn_open(persist_conn); */
118 /* } */
119 
120 /* Wait until a file is readable,
121  * RET false if can not be read */
_conn_readable(slurm_persist_conn_t * persist_conn)122 static bool _conn_readable(slurm_persist_conn_t *persist_conn)
123 {
124 	struct pollfd ufds;
125 	int rc, time_left;
126 
127 	xassert(persist_conn->shutdown);
128 
129 	ufds.fd     = persist_conn->fd;
130 	ufds.events = POLLIN;
131 	while (!(*persist_conn->shutdown)) {
132 		if (persist_conn->timeout) {
133 			struct timeval tstart;
134 			gettimeofday(&tstart, NULL);
135 			time_left = persist_conn->timeout - _tot_wait(&tstart);
136 		} else
137 			time_left = -1;
138 		rc = poll(&ufds, 1, time_left);
139 		if (*persist_conn->shutdown)
140 			break;
141 		if (rc == -1) {
142 			if ((errno == EINTR) || (errno == EAGAIN)) {
143 				debug3("%s: retrying poll for fd %d: %m",
144 					__func__, persist_conn->fd);
145 				continue;
146 			}
147 			error("%s: poll error for fd %d: %m",
148 			      __func__, persist_conn->fd);
149 			return false;
150 		}
151 		if (rc == 0) {
152 			debug("%s: poll for fd %d timeout after %d msecs of total wait %d msecs.",
153 			      __func__, persist_conn->fd, time_left,
154 			      persist_conn->timeout);
155 			return false;
156 		}
157 		if ((ufds.revents & POLLHUP) &&
158 		    ((ufds.revents & POLLIN) == 0)) {
159 			debug2("%s: persistent connection for fd %d closed",
160 			       __func__, persist_conn->fd);
161 			return false;
162 		}
163 		if (ufds.revents & POLLNVAL) {
164 			error("%s: persistent connection for fd %d is invalid",
165 			       __func__, persist_conn->fd);
166 			return false;
167 		}
168 		if (ufds.revents & POLLERR) {
169 			error("%s: persistent connection for fd %d experienced an error",
170 			       __func__, persist_conn->fd);
171 			return false;
172 		}
173 		if ((ufds.revents & POLLIN) == 0) {
174 			error("%s: persistent connection for fd %d missing POLLIN flag with revents 0x%"PRIx64,
175 			      __func__, persist_conn->fd, (uint64_t) ufds.revents);
176 			return false;
177 		}
178 		if (ufds.revents == POLLIN) {
179 			errno = 0;
180 			return true;
181 		}
182 
183 		fatal_abort("%s: poll returned unexpected revents: 0x%"PRIx64,
184 			    __func__, (uint64_t) ufds.revents);
185 	}
186 
187 	debug("%s: shutdown request detected for fd %d",
188 	      __func__, persist_conn->fd);
189 	return false;
190 }
191 
_destroy_persist_service(persist_service_conn_t * persist_service)192 static void _destroy_persist_service(persist_service_conn_t *persist_service)
193 {
194 	if (persist_service) {
195 		slurm_persist_conn_destroy(persist_service->conn);
196 		xfree(persist_service);
197 	}
198 }
199 
_sig_handler(int signal)200 static void _sig_handler(int signal)
201 {
202 }
203 
_persist_free_msg_members(slurm_persist_conn_t * persist_conn,persist_msg_t * persist_msg)204 static void _persist_free_msg_members(slurm_persist_conn_t *persist_conn,
205 				      persist_msg_t *persist_msg)
206 {
207 	if (persist_conn->flags & PERSIST_FLAG_DBD)
208 		slurmdbd_free_msg(persist_msg);
209 	else
210 		slurm_free_msg_data(persist_msg->msg_type, persist_msg->data);
211 }
212 
_process_service_connection(slurm_persist_conn_t * persist_conn,void * arg)213 static int _process_service_connection(
214 	slurm_persist_conn_t *persist_conn, void *arg)
215 {
216 	uint32_t nw_size = 0, msg_size = 0, uid = NO_VAL;
217 	char *msg_char = NULL;
218 	ssize_t msg_read = 0, offset = 0;
219 	bool first = true, fini = false;
220 	Buf buffer = NULL;
221 	int rc = SLURM_SUCCESS;
222 
223 	xassert(persist_conn->callback_proc);
224 	xassert(persist_conn->shutdown);
225 
226 	debug2("Opened connection %d from %s", persist_conn->fd,
227 	       persist_conn->rem_host);
228 
229 	if (persist_conn->flags & PERSIST_FLAG_ALREADY_INITED)
230 		first = false;
231 
232 	while (!(*persist_conn->shutdown) && !fini) {
233 		if (!_conn_readable(persist_conn))
234 			break;		/* problem with this socket */
235 		msg_read = read(persist_conn->fd, &nw_size, sizeof(nw_size));
236 		if (msg_read == 0)	/* EOF */
237 			break;
238 		if (msg_read != sizeof(nw_size)) {
239 			error("Could not read msg_size from "
240 			      "connection %d(%s) uid(%d)",
241 			      persist_conn->fd, persist_conn->rem_host, uid);
242 			break;
243 		}
244 		msg_size = ntohl(nw_size);
245 		if ((msg_size < 2) || (msg_size > MAX_MSG_SIZE)) {
246 			error("Invalid msg_size (%u) from "
247 			      "connection %d(%s) uid(%d)",
248 			      msg_size, persist_conn->fd,
249 			      persist_conn->rem_host, uid);
250 			break;
251 		}
252 
253 		msg_char = xmalloc(msg_size);
254 		offset = 0;
255 		while (msg_size > offset) {
256 			if (!_conn_readable(persist_conn))
257 				break;		/* problem with this socket */
258 			msg_read = read(persist_conn->fd, (msg_char + offset),
259 					(msg_size - offset));
260 			if (msg_read <= 0) {
261 				error("read(%d): %m", persist_conn->fd);
262 				break;
263 			}
264 			offset += msg_read;
265 		}
266 		if (msg_size == offset) {
267 			persist_msg_t msg;
268 
269 			rc = slurm_persist_conn_process_msg(
270 				persist_conn, &msg,
271 				msg_char, msg_size,
272 				&buffer, first);
273 
274 			if (rc == SLURM_SUCCESS) {
275 				rc = (persist_conn->callback_proc)(
276 					arg, &msg, &buffer, &uid);
277 				_persist_free_msg_members(persist_conn, &msg);
278 				if (rc != SLURM_SUCCESS &&
279 				    rc != ACCOUNTING_FIRST_REG &&
280 				    rc != ACCOUNTING_TRES_CHANGE_DB &&
281 				    rc != ACCOUNTING_NODES_CHANGE_DB) {
282 					error("Processing last message from "
283 					      "connection %d(%s) uid(%d)",
284 					      persist_conn->fd,
285 					      persist_conn->rem_host, uid);
286 					if (rc == ESLURM_ACCESS_DENIED ||
287 					    rc == SLURM_PROTOCOL_VERSION_ERROR)
288 						fini = true;
289 				}
290 			}
291 			first = false;
292 		} else {
293 			buffer = slurm_persist_make_rc_msg(
294 				persist_conn, SLURM_ERROR, "Bad offset", 0);
295 			fini = true;
296 		}
297 
298 		xfree(msg_char);
299 		if (buffer) {
300 			if (slurm_persist_send_msg(persist_conn, buffer)
301 			    != SLURM_SUCCESS) {
302 				/* This is only an issue on persistent
303 				 * connections, and really isn't that big of a
304 				 * deal as the slurmctld will just send the
305 				 * message again. */
306 				if (persist_conn->rem_port)
307 					debug("Problem sending response to "
308 					      "connection %d(%s) uid(%d)",
309 					      persist_conn->fd,
310 					      persist_conn->rem_host, uid);
311 				fini = true;
312 			}
313 			free_buf(buffer);
314 		}
315 	}
316 
317 	debug2("Closed connection %d uid(%d)", persist_conn->fd, uid);
318 
319 	return rc;
320 }
321 
_service_connection(void * arg)322 static void *_service_connection(void *arg)
323 {
324 	persist_service_conn_t *service_conn = arg;
325 
326 	xassert(service_conn);
327 	xassert(service_conn->conn);
328 
329 #if HAVE_SYS_PRCTL_H
330 	char *name = xstrdup_printf("p-%s",
331 				    service_conn->conn->cluster_name);
332 	if (prctl(PR_SET_NAME, name, NULL, NULL, NULL) < 0) {
333 		error("%s: cannot set my name to %s %m", __func__, name);
334 	}
335 	xfree(name);
336 #endif
337 
338 	service_conn->thread_id = pthread_self();
339 
340 	_process_service_connection(service_conn->conn, service_conn->arg);
341 
342 	if (service_conn->conn->callback_fini)
343 		(service_conn->conn->callback_fini)(service_conn->arg);
344 	else
345 		debug("Persist connection from cluster %s has disconnected",
346 		      service_conn->conn->cluster_name);
347 
348 	/* service_conn is freed inside here */
349 	slurm_persist_conn_free_thread_loc(service_conn->thread_loc);
350 //	xfree(service_conn);
351 
352 	/* In order to avoid zombie threads, detach the thread now before
353 	 * exiting.  slurm_persist_conn_recv_server_fini() will not try to join
354 	 * the thread because slurm_persist_conn_free_thread_loc() will have
355 	 * free'd the connection. If their are threads at shutdown, the join
356 	 * will happen before the detach so recv_fini() will wait until the
357 	 * thread is done.
358 	 *
359 	 * pthread_join man page:
360 	 * Failure to join with a thread that is joinable (i.e., one that is not
361 	 * detached), produces a "zombie thread". Avoid doing this, since each
362 	 * zombie thread consumes some system resources, and when enough zombie
363 	 * threads have accumulated, it will no longer be possible to create new
364 	 * threads (or processes).
365 	 */
366 	pthread_detach(pthread_self());
367 
368 	return NULL;
369 }
370 
slurm_persist_conn_recv_server_init(void)371 extern void slurm_persist_conn_recv_server_init(void)
372 {
373 	int sigarray[] = {SIGUSR1, 0};
374 
375 	shutdown_time = 0;
376 
377 	(void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
378 	(void) pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
379 
380 	/* Prepare to catch SIGUSR1 to interrupt accept().
381 	 * This signal is generated by the slurmdbd signal
382 	 * handler thread upon receipt of SIGABRT, SIGINT,
383 	 * or SIGTERM. That thread does all processing of
384 	 * all signals. */
385 	xsignal(SIGUSR1, _sig_handler);
386 	xsignal_unblock(sigarray);
387 }
388 
slurm_persist_conn_recv_server_fini(void)389 extern void slurm_persist_conn_recv_server_fini(void)
390 {
391 	int i;
392 
393 	shutdown_time = time(NULL);
394 	slurm_mutex_lock(&thread_count_lock);
395 	for (i=0; i<MAX_THREAD_COUNT; i++) {
396 		if (!persist_service_conn[i])
397 			continue;
398 		if (persist_service_conn[i]->thread_id)
399 			pthread_kill(persist_service_conn[i]->thread_id,
400 				     SIGUSR1);
401 	}
402 	/* It is faster to signal then wait since the threads would end serially
403 	 * instead of parallel if you did it all in one loop.
404 	 */
405 	for (i=0; i<MAX_THREAD_COUNT; i++) {
406 		if (!persist_service_conn[i])
407 			continue;
408 		if (persist_service_conn[i]->thread_id) {
409 			pthread_t thread_id =
410 				persist_service_conn[i]->thread_id;
411 
412 			/* Let go of lock in case the persistent connection
413 			 * thread is cleaning itself up.
414 			 * slurm_persist_conn_free_thread_loc() may be trying to
415 			 * remove itself but could be waiting on the
416 			 * thread_count mutex which this has locked. */
417 			slurm_mutex_unlock(&thread_count_lock);
418 			pthread_join(thread_id, NULL);
419 			slurm_mutex_lock(&thread_count_lock);
420 		}
421 		_destroy_persist_service(persist_service_conn[i]);
422 		persist_service_conn[i] = NULL;
423 	}
424 	slurm_mutex_unlock(&thread_count_lock);
425 }
426 
slurm_persist_conn_recv_thread_init(slurm_persist_conn_t * persist_conn,int thread_loc,void * arg)427 extern void slurm_persist_conn_recv_thread_init(slurm_persist_conn_t *persist_conn,
428 						int thread_loc, void *arg)
429 {
430 	persist_service_conn_t *service_conn;
431 
432 	if (thread_loc < 0)
433 		thread_loc = slurm_persist_conn_wait_for_thread_loc();
434 	if (thread_loc < 0)
435 		return;
436 
437 	service_conn = xmalloc(sizeof(persist_service_conn_t));
438 
439 	slurm_mutex_lock(&thread_count_lock);
440 	persist_service_conn[thread_loc] = service_conn;
441 	slurm_mutex_unlock(&thread_count_lock);
442 
443 	service_conn->arg = arg;
444 	service_conn->conn = persist_conn;
445 	service_conn->thread_loc = thread_loc;
446 
447 	persist_conn->timeout = 0; /* If this isn't zero we won't wait forever
448 				      like we want to.
449 				   */
450 
451 	//_service_connection(service_conn);
452 	slurm_thread_create(&persist_service_conn[thread_loc]->thread_id,
453 			    _service_connection, service_conn);
454 }
455 
456 /* Increment thread_count and don't return until its value is no larger
457  *	than MAX_THREAD_COUNT,
458  * RET index of free index in persist_service_conn or -1 to exit */
slurm_persist_conn_wait_for_thread_loc(void)459 extern int slurm_persist_conn_wait_for_thread_loc(void)
460 {
461 	bool print_it = true;
462 	int i, rc = -1;
463 
464 	slurm_mutex_lock(&thread_count_lock);
465 	while (1) {
466 		if (shutdown_time)
467 			break;
468 
469 		if (thread_count < MAX_THREAD_COUNT) {
470 			thread_count++;
471 			for (i=0; i<MAX_THREAD_COUNT; i++) {
472 				if (persist_service_conn[i])
473 					continue;
474 				rc = i;
475 				break;
476 			}
477 			if (rc == -1) {
478 				/* thread_count and persist_thread_id
479 				 * out of sync */
480 				fatal("No free persist_thread_id");
481 			}
482 			break;
483 		} else {
484 			/* wait for state change and retry,
485 			 * just a delay and not an error.
486 			 * This can happen when the epilog completes
487 			 * on a bunch of nodes at the same time, which
488 			 * can easily happen for highly parallel jobs. */
489 			if (print_it) {
490 				static time_t last_print_time = 0;
491 				time_t now = time(NULL);
492 				if (difftime(now, last_print_time) > 2) {
493 					verbose("thread_count over "
494 						"limit (%d), waiting",
495 						thread_count);
496 					last_print_time = now;
497 				}
498 				print_it = false;
499 			}
500 			slurm_cond_wait(&thread_count_cond, &thread_count_lock);
501 		}
502 	}
503 	slurm_mutex_unlock(&thread_count_lock);
504 	return rc;
505 }
506 
507 /* my_tid IN - Thread ID of spawned thread, 0 if no thread spawned */
slurm_persist_conn_free_thread_loc(int thread_loc)508 extern void slurm_persist_conn_free_thread_loc(int thread_loc)
509 {
510 	/* we will handle this in the fini */
511 	if (shutdown_time)
512 		return;
513 
514 	slurm_mutex_lock(&thread_count_lock);
515 	if (thread_count > 0)
516 		thread_count--;
517 	else
518 		error("thread_count underflow");
519 
520 	_destroy_persist_service(persist_service_conn[thread_loc]);
521 	persist_service_conn[thread_loc] = NULL;
522 
523 	slurm_cond_broadcast(&thread_count_cond);
524 	slurm_mutex_unlock(&thread_count_lock);
525 }
526 
slurm_persist_conn_open_without_init(slurm_persist_conn_t * persist_conn)527 extern int slurm_persist_conn_open_without_init(
528 	slurm_persist_conn_t *persist_conn)
529 {
530 	slurm_addr_t addr;
531 
532 	xassert(persist_conn);
533 	xassert(persist_conn->rem_host);
534 	xassert(persist_conn->rem_port);
535 	xassert(persist_conn->cluster_name);
536 
537 	if (persist_conn->fd > 0)
538 		_close_fd(&persist_conn->fd);
539 	else
540 		persist_conn->fd = -1;
541 
542 	if (!persist_conn->inited)
543 		persist_conn->inited = true;
544 
545 	if (!persist_conn->version) {
546 		/* Set to MIN_PROTOCOL so that a higher version controller can
547 		 * talk to a lower protocol version controller. When talking to
548 		 * the DBD, the protocol version should be set to the current
549 		 * protocol version prior to calling this. */
550 		persist_conn->version = SLURM_MIN_PROTOCOL_VERSION;
551 	}
552 	if (persist_conn->timeout < 0)
553 		persist_conn->timeout = slurm_get_msg_timeout() * 1000;
554 
555 	slurm_set_addr_char(&addr, persist_conn->rem_port,
556 			    persist_conn->rem_host);
557 	if ((persist_conn->fd = slurm_open_msg_conn(&addr)) < 0) {
558 		if (_comm_fail_log(persist_conn)) {
559 			char *s = xstrdup_printf("%s: failed to open persistent connection to %s:%d: %m",
560 						 __func__,
561 						 persist_conn->rem_host,
562 						 persist_conn->rem_port);
563 			if (persist_conn->flags & PERSIST_FLAG_SUPPRESS_ERR)
564 				debug2("%s", s);
565 			else
566 				error("%s", s);
567 			xfree(s);
568 		}
569 		return SLURM_ERROR;
570 	}
571 	fd_set_nonblocking(persist_conn->fd);
572 	fd_set_close_on_exec(persist_conn->fd);
573 
574 	return SLURM_SUCCESS;
575 }
576 
577 
578 /* Open a persistent socket connection
579  * IN/OUT - persistent connection needing rem_host and rem_port filled in.
580  * Returned completely filled in.
581  * Returns SLURM_SUCCESS on success or SLURM_ERROR on failure */
slurm_persist_conn_open(slurm_persist_conn_t * persist_conn)582 extern int slurm_persist_conn_open(slurm_persist_conn_t *persist_conn)
583 {
584 	int rc = SLURM_ERROR;
585 	slurm_msg_t req_msg;
586 	persist_init_req_msg_t req;
587 	persist_rc_msg_t *resp = NULL;
588 
589 	if (slurm_persist_conn_open_without_init(persist_conn) != SLURM_SUCCESS)
590 		return rc;
591 
592 	slurm_msg_t_init(&req_msg);
593 
594 	/* Always send the lowest protocol since we don't know what version the
595 	 * other side is running yet.
596 	 */
597 	req_msg.protocol_version = persist_conn->version;
598 	req_msg.msg_type = REQUEST_PERSIST_INIT;
599 
600 	req_msg.flags |= SLURM_GLOBAL_AUTH_KEY;
601 	if (persist_conn->flags & PERSIST_FLAG_DBD)
602 		req_msg.flags |= SLURMDBD_CONNECTION;
603 
604 	memset(&req, 0, sizeof(persist_init_req_msg_t));
605 	req.cluster_name = persist_conn->cluster_name;
606 	req.persist_type = persist_conn->persist_type;
607 	req.port = persist_conn->my_port;
608 	req.version = SLURM_PROTOCOL_VERSION;
609 
610 	req_msg.data = &req;
611 
612 	if (slurm_send_node_msg(persist_conn->fd, &req_msg) < 0) {
613 		error("%s: failed to send persistent connection init message to %s:%d",
614 		      __func__, persist_conn->rem_host, persist_conn->rem_port);
615 		_close_fd(&persist_conn->fd);
616 	} else {
617 		Buf buffer = slurm_persist_recv_msg(persist_conn);
618 		persist_msg_t msg;
619 		slurm_persist_conn_t persist_conn_tmp;
620 
621 		if (!buffer) {
622 			if (_comm_fail_log(persist_conn)) {
623 				error("%s: No response to persist_init",
624 				      __func__);
625 			}
626 			_close_fd(&persist_conn->fd);
627 			goto end_it;
628 		}
629 		memset(&msg, 0, sizeof(persist_msg_t));
630 		memcpy(&persist_conn_tmp, persist_conn,
631 		       sizeof(slurm_persist_conn_t));
632 		/* The first unpack is done the same way for dbd or normal
633 		 * communication . */
634 		persist_conn_tmp.flags &= (~PERSIST_FLAG_DBD);
635 		rc = slurm_persist_msg_unpack(&persist_conn_tmp, &msg, buffer);
636 		free_buf(buffer);
637 
638 		resp = (persist_rc_msg_t *)msg.data;
639 		if (resp && (rc == SLURM_SUCCESS)) {
640 			rc = resp->rc;
641 			persist_conn->version = resp->ret_info;
642 			persist_conn->flags |= resp->flags;
643 		}
644 
645 		if (rc != SLURM_SUCCESS) {
646 			if (resp) {
647 				error("%s: Something happened with the receiving/processing of the persistent connection init message to %s:%d: %s",
648 				      __func__, persist_conn->rem_host,
649 				      persist_conn->rem_port, resp->comment);
650 			} else {
651 				error("%s: Failed to unpack persistent connection init resp message from %s:%d",
652 				      __func__,
653 				      persist_conn->rem_host,
654 				      persist_conn->rem_port);
655 			}
656 			_close_fd(&persist_conn->fd);
657 		}
658 	}
659 
660 end_it:
661 
662 	slurm_persist_free_rc_msg(resp);
663 
664 	return rc;
665 }
666 
slurm_persist_conn_close(slurm_persist_conn_t * persist_conn)667 extern void slurm_persist_conn_close(slurm_persist_conn_t *persist_conn)
668 {
669 	if (!persist_conn)
670 		return;
671 
672 	_close_fd(&persist_conn->fd);
673 }
674 
slurm_persist_conn_reopen(slurm_persist_conn_t * persist_conn,bool with_init)675 extern int slurm_persist_conn_reopen(slurm_persist_conn_t *persist_conn,
676 				     bool with_init)
677 {
678 	slurm_persist_conn_close(persist_conn);
679 
680 	if (with_init)
681 		return slurm_persist_conn_open(persist_conn);
682 	else
683 		return slurm_persist_conn_open_without_init(persist_conn);
684 }
685 
686 /* Close the persistent connection */
slurm_persist_conn_members_destroy(slurm_persist_conn_t * persist_conn)687 extern void slurm_persist_conn_members_destroy(
688 	slurm_persist_conn_t *persist_conn)
689 {
690 	if (!persist_conn)
691 		return;
692 
693 	persist_conn->inited = false;
694 	slurm_persist_conn_close(persist_conn);
695 
696 	if (persist_conn->auth_cred) {
697 		g_slurm_auth_destroy(persist_conn->auth_cred);
698 		persist_conn->auth_cred = NULL;
699 	}
700 	xfree(persist_conn->cluster_name);
701 	xfree(persist_conn->rem_host);
702 }
703 
704 /* Close the persistent connection */
slurm_persist_conn_destroy(slurm_persist_conn_t * persist_conn)705 extern void slurm_persist_conn_destroy(slurm_persist_conn_t *persist_conn)
706 {
707 	if (!persist_conn)
708 		return;
709 	slurm_persist_conn_members_destroy(persist_conn);
710 	xfree(persist_conn);
711 }
712 
slurm_persist_conn_process_msg(slurm_persist_conn_t * persist_conn,persist_msg_t * persist_msg,char * msg_char,uint32_t msg_size,Buf * out_buffer,bool first)713 extern int slurm_persist_conn_process_msg(slurm_persist_conn_t *persist_conn,
714 					  persist_msg_t *persist_msg,
715 					  char *msg_char, uint32_t msg_size,
716 					  Buf *out_buffer, bool first)
717 {
718 	int rc;
719 	Buf recv_buffer = NULL;
720 	char *comment = NULL;
721 
722 	/* puts msg_char into buffer struct */
723 	recv_buffer = create_buf(msg_char, msg_size);
724 
725 	memset(persist_msg, 0, sizeof(persist_msg_t));
726 	rc = slurm_persist_msg_unpack(persist_conn, persist_msg, recv_buffer);
727 	xfer_buf_data(recv_buffer); /* delete in_buffer struct
728 				     * without xfree of msg_char
729 				     * (done later in this
730 				     * function). */
731 	if (rc != SLURM_SUCCESS) {
732 		comment = xstrdup_printf("Failed to unpack %s message",
733 					 slurmdbd_msg_type_2_str(
734 						 persist_msg->msg_type, true));
735 		error("CONN:%u %s", persist_conn->fd, comment);
736 		*out_buffer = slurm_persist_make_rc_msg(
737 			persist_conn, rc, comment, persist_msg->msg_type);
738 		xfree(comment);
739 	} else if (first &&
740 		   (persist_msg->msg_type != REQUEST_PERSIST_INIT)) {
741 		comment = "Initial RPC not REQUEST_PERSIST_INIT";
742 		error("CONN:%u %s type (%d)",
743 		      persist_conn->fd, comment, persist_msg->msg_type);
744 		rc = EINVAL;
745 		*out_buffer = slurm_persist_make_rc_msg(
746 			persist_conn, rc, comment,
747 			REQUEST_PERSIST_INIT);
748 	} else if (!first &&
749 		   (persist_msg->msg_type == REQUEST_PERSIST_INIT)) {
750 		comment = "REQUEST_PERSIST_INIT sent after connection established";
751 		error("CONN:%u %s", persist_conn->fd, comment);
752 		rc = EINVAL;
753 		*out_buffer = slurm_persist_make_rc_msg(
754 			persist_conn, rc, comment, REQUEST_PERSIST_INIT);
755 	}
756 
757 	return rc;
758 }
759 
760 /* Wait until a file is writeable,
761  * RET 1 if file can be written now,
762  *     0 if can not be written to within 5 seconds
763  *     -1 if file has been closed POLLHUP
764  */
slurm_persist_conn_writeable(slurm_persist_conn_t * persist_conn)765 extern int slurm_persist_conn_writeable(slurm_persist_conn_t *persist_conn)
766 {
767 	struct pollfd ufds;
768 	int write_timeout = 5000;
769 	int rc, time_left;
770 	struct timeval tstart;
771 	char temp[2];
772 
773 	xassert(persist_conn->shutdown);
774 
775 	if (persist_conn->fd < 0)
776 		return -1;
777 
778 	ufds.fd     = persist_conn->fd;
779 	ufds.events = POLLOUT;
780 	gettimeofday(&tstart, NULL);
781 	while ((*persist_conn->shutdown) == 0) {
782 		time_left = write_timeout - _tot_wait(&tstart);
783 		rc = poll(&ufds, 1, time_left);
784 		if (rc == -1) {
785 			if ((errno == EINTR) || (errno == EAGAIN))
786 				continue;
787 			error("%s: poll error: %m", __func__);
788 			return -1;
789 		}
790 		if (rc == 0)
791 			return 0;
792 		/*
793 		 * Check here to make sure the socket really is there.
794 		 * If not then exit out and notify the conn.  This
795 		 * is here since a write doesn't always tell you the
796 		 * socket is gone, but getting 0 back from a
797 		 * nonblocking read means just that.
798 		 */
799 		if (ufds.revents & POLLHUP ||
800 		    (recv(persist_conn->fd, &temp, 1, 0) == 0)) {
801 			debug2("%s: persistent connection %d is closed for writes",
802 			       __func__, persist_conn->fd);
803 			if (persist_conn->trigger_callbacks.dbd_fail)
804 				(persist_conn->trigger_callbacks.dbd_fail)();
805 			return -1;
806 		}
807 		if (ufds.revents & POLLNVAL) {
808 			error("%s: persistent connection %d is invalid",
809 			      __func__, persist_conn->fd);
810 			return 0;
811 		}
812 		if (ufds.revents & POLLERR) {
813 			if (_comm_fail_log(persist_conn)) {
814 				if (fd_get_socket_error(persist_conn->fd, &errno))
815 					error("%s: unable to get error for persistent connection %d: %m",
816 					      __func__, persist_conn->fd);
817 				else
818 					error("%s: persistent connection %d experienced an error: %m",
819 					      __func__, persist_conn->fd);
820 			}
821 			if (persist_conn->trigger_callbacks.dbd_fail)
822 				(persist_conn->trigger_callbacks.dbd_fail)();
823 			return 0;
824 		}
825 		if ((ufds.revents & POLLOUT) == 0) {
826 			error("%s: persistent connection %d events %d",
827 			      __func__, persist_conn->fd, ufds.revents);
828 			return 0;
829 		}
830 		/* revents == POLLOUT */
831 		errno = 0;
832 		return 1;
833 	}
834 	return 0;
835 }
836 
slurm_persist_send_msg(slurm_persist_conn_t * persist_conn,Buf buffer)837 extern int slurm_persist_send_msg(
838 	slurm_persist_conn_t *persist_conn, Buf buffer)
839 {
840 	uint32_t msg_size, nw_size;
841 	char *msg;
842 	ssize_t msg_wrote;
843 	int rc, retry_cnt = 0;
844 
845 	xassert(persist_conn);
846 
847 	if (persist_conn->fd < 0)
848 		return EAGAIN;
849 
850 	if (!buffer)
851 		return SLURM_ERROR;
852 
853 	rc = slurm_persist_conn_writeable(persist_conn);
854 	if (rc == -1) {
855 	re_open:
856 		/* if errno is ACCESS_DENIED do not try to reopen to
857 		   connection just return that */
858 		if (errno == ESLURM_ACCESS_DENIED)
859 			return ESLURM_ACCESS_DENIED;
860 
861 		if (retry_cnt++ > 3)
862 			return SLURM_COMMUNICATIONS_SEND_ERROR;
863 
864 		if (persist_conn->flags & PERSIST_FLAG_RECONNECT) {
865 			slurm_persist_conn_reopen(persist_conn, true);
866 			rc = slurm_persist_conn_writeable(persist_conn);
867 		} else
868 			return SLURM_ERROR;
869 	}
870 	if (rc < 1)
871 		return EAGAIN;
872 
873 	msg_size = get_buf_offset(buffer);
874 	nw_size = htonl(msg_size);
875 	msg_wrote = write(persist_conn->fd, &nw_size, sizeof(nw_size));
876 	if (msg_wrote != sizeof(nw_size))
877 		return EAGAIN;
878 
879 	msg = get_buf_data(buffer);
880 	while (msg_size > 0) {
881 		rc = slurm_persist_conn_writeable(persist_conn);
882 		if (rc == -1)
883 			goto re_open;
884 		if (rc < 1)
885 			return EAGAIN;
886 		msg_wrote = write(persist_conn->fd, msg, msg_size);
887 		if (msg_wrote <= 0)
888 			return EAGAIN;
889 		msg += msg_wrote;
890 		msg_size -= msg_wrote;
891 	}
892 
893 	return SLURM_SUCCESS;
894 }
895 
slurm_persist_recv_msg(slurm_persist_conn_t * persist_conn)896 extern Buf slurm_persist_recv_msg(slurm_persist_conn_t *persist_conn)
897 {
898 	uint32_t msg_size, nw_size;
899 	char *msg;
900 	ssize_t msg_read, offset;
901 	Buf buffer;
902 
903 	xassert(persist_conn);
904 
905 	if (persist_conn->fd < 0)
906 		return NULL;
907 
908 	if (!_conn_readable(persist_conn))
909 		goto endit;
910 
911 	msg_read = read(persist_conn->fd, &nw_size, sizeof(nw_size));
912 	if (msg_read != sizeof(nw_size))
913 		goto endit;
914 	msg_size = ntohl(nw_size);
915 	/* We don't error check for an upper limit here
916 	 * since size could possibly be massive */
917 	if (msg_size < 2) {
918 		error("Persistent Conn: Invalid msg_size (%u)", msg_size);
919 		goto endit;
920 	}
921 
922 	msg = xmalloc(msg_size);
923 	offset = 0;
924 	while (msg_size > offset) {
925 		if (!_conn_readable(persist_conn))
926 			break;		/* problem with this socket */
927 		msg_read = read(persist_conn->fd, (msg + offset),
928 				(msg_size - offset));
929 		if (msg_read <= 0) {
930 			error("Persistent Conn: read: %m");
931 			break;
932 		}
933 		offset += msg_read;
934 	}
935 	if (msg_size != offset) {
936 		if (!(*persist_conn->shutdown)) {
937 			error("Persistent Conn: only read %zd of %d bytes",
938 			      offset, msg_size);
939 		}	/* else in shutdown mode */
940 		xfree(msg);
941 		goto endit;
942 	}
943 
944 	buffer = create_buf(msg, msg_size);
945 	return buffer;
946 
947 endit:
948 	/* Close it since we abandoned it.  If the connection does still exist
949 	 * on the other end we can't rely on it after this point since we didn't
950 	 * listen long enough for this response.
951 	 */
952 	if (!(*persist_conn->shutdown) &&
953 	    persist_conn->flags & PERSIST_FLAG_RECONNECT)
954 		slurm_persist_conn_reopen(persist_conn, true);
955 
956 	return NULL;
957 }
958 
slurm_persist_msg_pack(slurm_persist_conn_t * persist_conn,persist_msg_t * req_msg)959 extern Buf slurm_persist_msg_pack(slurm_persist_conn_t *persist_conn,
960 				  persist_msg_t *req_msg)
961 {
962 	Buf buffer;
963 
964 	xassert(persist_conn);
965 
966 	if (persist_conn->flags & PERSIST_FLAG_DBD)
967 		buffer = pack_slurmdbd_msg(req_msg, persist_conn->version);
968 	else {
969 		slurm_msg_t msg;
970 
971 		slurm_msg_t_init(&msg);
972 
973 		msg.data      = req_msg->data;
974 		msg.data_size = req_msg->data_size;
975 		msg.msg_type  = req_msg->msg_type;
976 		msg.protocol_version = persist_conn->version;
977 
978 		buffer = init_buf(BUF_SIZE);
979 
980 		pack16(req_msg->msg_type, buffer);
981 		if (pack_msg(&msg, buffer) != SLURM_SUCCESS) {
982 			free_buf(buffer);
983 			return NULL;
984                 }
985 	}
986 
987 	return buffer;
988 }
989 
990 
slurm_persist_msg_unpack(slurm_persist_conn_t * persist_conn,persist_msg_t * resp_msg,Buf buffer)991 extern int slurm_persist_msg_unpack(slurm_persist_conn_t *persist_conn,
992 				    persist_msg_t *resp_msg, Buf buffer)
993 {
994 	int rc;
995 
996 	xassert(persist_conn);
997 	xassert(resp_msg);
998 
999 	if (persist_conn->flags & PERSIST_FLAG_DBD) {
1000 		rc = unpack_slurmdbd_msg(resp_msg,
1001 					 persist_conn->version,
1002 					 buffer);
1003 	} else {
1004 		slurm_msg_t msg;
1005 
1006 		slurm_msg_t_init(&msg);
1007 
1008 		msg.protocol_version = persist_conn->version;
1009 
1010 		safe_unpack16(&msg.msg_type, buffer);
1011 
1012 		rc = unpack_msg(&msg, buffer);
1013 
1014 		resp_msg->msg_type = msg.msg_type;
1015 		resp_msg->data = msg.data;
1016 	}
1017 
1018 	/* Here we transfer the auth_cred to the persist_conn just in case in the
1019 	 * future we need to use it in some way to verify things for messages
1020 	 * that don't have on that will follow on the connection.
1021 	 */
1022 	if (resp_msg->msg_type == REQUEST_PERSIST_INIT) {
1023 		slurm_msg_t *msg = resp_msg->data;
1024 		if (persist_conn->auth_cred)
1025 			g_slurm_auth_destroy(persist_conn->auth_cred);
1026 
1027 		persist_conn->auth_cred = msg->auth_cred;
1028 		msg->auth_cred = NULL;
1029 	}
1030 
1031 	return rc;
1032 unpack_error:
1033 	return SLURM_ERROR;
1034 }
1035 
slurm_persist_pack_init_req_msg(persist_init_req_msg_t * msg,Buf buffer)1036 extern void slurm_persist_pack_init_req_msg(
1037 	persist_init_req_msg_t *msg, Buf buffer)
1038 {
1039 	/* always send version field first for backwards compatibility */
1040 	pack16(msg->version, buffer);
1041 
1042 	if (msg->version >= SLURM_MIN_PROTOCOL_VERSION) {
1043 		packstr(msg->cluster_name, buffer);
1044 		pack16(msg->persist_type, buffer);
1045 		pack16(msg->port, buffer);
1046 	} else {
1047 		error("%s: invalid protocol version %u",
1048 		      __func__, msg->version);
1049 	}
1050 }
1051 
slurm_persist_unpack_init_req_msg(persist_init_req_msg_t ** msg,Buf buffer)1052 extern int slurm_persist_unpack_init_req_msg(
1053 	persist_init_req_msg_t **msg, Buf buffer)
1054 {
1055 	uint32_t tmp32;
1056 
1057 	persist_init_req_msg_t *msg_ptr =
1058 		xmalloc(sizeof(persist_init_req_msg_t));
1059 
1060 	*msg = msg_ptr;
1061 
1062 	safe_unpack16(&msg_ptr->version, buffer);
1063 
1064 	if (msg_ptr->version >= SLURM_MIN_PROTOCOL_VERSION) {
1065 		safe_unpackstr_xmalloc(&msg_ptr->cluster_name, &tmp32, buffer);
1066 		safe_unpack16(&msg_ptr->persist_type, buffer);
1067 		safe_unpack16(&msg_ptr->port, buffer);
1068 	} else {
1069 		error("%s: invalid protocol_version %u",
1070 		      __func__, msg_ptr->version);
1071 		goto unpack_error;
1072 	}
1073 
1074 	return SLURM_SUCCESS;
1075 
1076 unpack_error:
1077 	slurm_persist_free_init_req_msg(msg_ptr);
1078 	*msg = NULL;
1079 	return SLURM_ERROR;
1080 }
1081 
slurm_persist_free_init_req_msg(persist_init_req_msg_t * msg)1082 extern void slurm_persist_free_init_req_msg(persist_init_req_msg_t *msg)
1083 {
1084 	if (msg) {
1085 		xfree(msg->cluster_name);
1086 		xfree(msg);
1087 	}
1088 }
1089 
slurm_persist_pack_rc_msg(persist_rc_msg_t * msg,Buf buffer,uint16_t protocol_version)1090 extern void slurm_persist_pack_rc_msg(
1091 	persist_rc_msg_t *msg, Buf buffer, uint16_t protocol_version)
1092 {
1093 	if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
1094 		packstr(msg->comment, buffer);
1095 		pack16(msg->flags, buffer);
1096 		pack32(msg->rc, buffer);
1097 		pack16(msg->ret_info, buffer);
1098 	} else {
1099 		error("%s: invalid protocol version %u",
1100 		      __func__, protocol_version);
1101 	}
1102 }
1103 
slurm_persist_unpack_rc_msg(persist_rc_msg_t ** msg,Buf buffer,uint16_t protocol_version)1104 extern int slurm_persist_unpack_rc_msg(
1105 	persist_rc_msg_t **msg, Buf buffer, uint16_t protocol_version)
1106 {
1107 	uint32_t uint32_tmp;
1108 
1109 	persist_rc_msg_t *msg_ptr = xmalloc(sizeof(persist_rc_msg_t));
1110 
1111 	*msg = msg_ptr;
1112 
1113 	if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
1114 		safe_unpackstr_xmalloc(&msg_ptr->comment, &uint32_tmp, buffer);
1115 		safe_unpack16(&msg_ptr->flags, buffer);
1116 		safe_unpack32(&msg_ptr->rc, buffer);
1117 		safe_unpack16(&msg_ptr->ret_info, buffer);
1118 	} else {
1119 		error("%s: invalid protocol_version %u",
1120 		      __func__, protocol_version);
1121 		goto unpack_error;
1122 	}
1123 
1124 	return SLURM_SUCCESS;
1125 
1126 unpack_error:
1127 	slurm_persist_free_rc_msg(msg_ptr);
1128 	*msg = NULL;
1129 	return SLURM_ERROR;
1130 }
1131 
slurm_persist_free_rc_msg(persist_rc_msg_t * msg)1132 extern void slurm_persist_free_rc_msg(persist_rc_msg_t *msg)
1133 {
1134 	if (msg) {
1135 		xfree(msg->comment);
1136 		xfree(msg);
1137 	}
1138 }
1139 
slurm_persist_make_rc_msg(slurm_persist_conn_t * persist_conn,uint32_t rc,char * comment,uint16_t ret_info)1140 extern Buf slurm_persist_make_rc_msg(slurm_persist_conn_t *persist_conn,
1141 				     uint32_t rc, char *comment,
1142 				     uint16_t ret_info)
1143 {
1144 	persist_rc_msg_t msg;
1145 	persist_msg_t resp;
1146 
1147 	memset(&msg, 0, sizeof(persist_rc_msg_t));
1148 	memset(&resp, 0, sizeof(persist_msg_t));
1149 
1150 	msg.rc = rc;
1151 	msg.comment = comment;
1152 	msg.ret_info = ret_info;
1153 
1154 	resp.msg_type = PERSIST_RC;
1155 	resp.data = &msg;
1156 
1157 	return slurm_persist_msg_pack(persist_conn, &resp);
1158 }
1159 
slurm_persist_make_rc_msg_flags(slurm_persist_conn_t * persist_conn,uint32_t rc,char * comment,uint16_t flags,uint16_t ret_info)1160 extern Buf slurm_persist_make_rc_msg_flags(slurm_persist_conn_t *persist_conn,
1161 					   uint32_t rc, char *comment,
1162 					   uint16_t flags,
1163 					   uint16_t ret_info)
1164 {
1165 	persist_rc_msg_t msg;
1166 	persist_msg_t resp;
1167 
1168 	memset(&msg, 0, sizeof(persist_rc_msg_t));
1169 	memset(&resp, 0, sizeof(persist_msg_t));
1170 
1171 	msg.rc = rc;
1172 	msg.flags = flags;
1173 	msg.comment = comment;
1174 	msg.ret_info = ret_info;
1175 
1176 	resp.msg_type = PERSIST_RC;
1177 	resp.data = &msg;
1178 
1179 	return slurm_persist_msg_pack(persist_conn, &resp);
1180 }
1181