1 /*****************************************************************************\
2 * slurm_persist_conn.h - Definitions for communicating over a persistent
3 * connection within Slurm.
4 ******************************************************************************
5 * Copyright (C) 2016 SchedMD LLC
6 * Written by Danny Auble da@schedmd.com, et. al.
7 *
8 * This file is part of Slurm, a resource management program.
9 * For details, see <https://slurm.schedmd.com/>.
10 * Please also read the included file: DISCLAIMER.
11 *
12 * Slurm is free software; you can redistribute it and/or modify it under
13 * the terms of the GNU General Public License as published by the Free
14 * Software Foundation; either version 2 of the License, or (at your option)
15 * any later version.
16 *
17 * In addition, as a special exception, the copyright holders give permission
18 * to link the code of portions of this program with the OpenSSL library under
19 * certain conditions as described in each individual source file, and
20 * distribute linked combinations including the two. You must obey the GNU
21 * General Public License in all respects for all of the code used other than
22 * OpenSSL. If you modify file(s) with this exception, you may extend this
23 * exception to your version of the file(s), but you are not obligated to do
24 * so. If you do not wish to do so, delete this exception statement from your
25 * version. If you delete this exception statement from all source files in
26 * the program, then also delete it here.
27 *
28 * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
29 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
30 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
31 * details.
32 *
33 * You should have received a copy of the GNU General Public License along
34 * with Slurm; if not, write to the Free Software Foundation, Inc.,
35 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
36 \*****************************************************************************/
37
38 #include "config.h"
39
40 #include <poll.h>
41 #include <pthread.h>
42
43 #if HAVE_SYS_PRCTL_H
44 #include <sys/prctl.h>
45 #endif
46
47 #include "slurm/slurm_errno.h"
48 #include "src/common/fd.h"
49 #include "src/common/macros.h"
50 #include "src/common/slurm_auth.h"
51 #include "src/common/slurm_protocol_pack.h"
52 #include "src/common/slurmdbd_defs.h"
53 #include "src/common/slurmdbd_pack.h"
54 #include "src/common/xsignal.h"
55 #include "slurm_persist_conn.h"
56
57 #define MAX_THREAD_COUNT 100
58
59 /*
60 * Maximum message size. Messages larger than this value (in bytes)
61 * will not be received.
62 */
63 #define MAX_MSG_SIZE (16*1024*1024)
64
65 typedef struct {
66 void *arg;
67 slurm_persist_conn_t *conn;
68 int thread_loc;
69 pthread_t thread_id;
70 } persist_service_conn_t;
71
72 static persist_service_conn_t *persist_service_conn[MAX_THREAD_COUNT];
73 static int thread_count = 0;
74 static pthread_mutex_t thread_count_lock = PTHREAD_MUTEX_INITIALIZER;
75 static pthread_cond_t thread_count_cond = PTHREAD_COND_INITIALIZER;
76 static int shutdown_time = 0;
77
78 /* Return time in msec since "start time" */
_tot_wait(struct timeval * start_time)79 static int _tot_wait (struct timeval *start_time)
80 {
81 struct timeval end_time;
82 int msec_delay;
83
84 gettimeofday(&end_time, NULL);
85 msec_delay = (end_time.tv_sec - start_time->tv_sec ) * 1000;
86 msec_delay += ((end_time.tv_usec - start_time->tv_usec + 500) / 1000);
87 return msec_delay;
88 }
89
90 /* close and fd and replace it with a -1 */
_close_fd(int * fd)91 static void _close_fd(int *fd)
92 {
93 if (*fd && *fd >= 0) {
94 close(*fd);
95 *fd = -1;
96 }
97 }
98
99 /* Return true if communication failure should be logged. Only log failures
100 * every 10 minutes to avoid filling logs */
_comm_fail_log(slurm_persist_conn_t * persist_conn)101 static bool _comm_fail_log(slurm_persist_conn_t *persist_conn)
102 {
103 time_t now = time(NULL);
104 time_t old = now - 600; /* Log failures once every 10 mins */
105
106 if (persist_conn->comm_fail_time < old) {
107 persist_conn->comm_fail_time = now;
108 return true;
109 }
110 return false;
111 }
112
113 /* static void _reopen_persist_conn(slurm_persist_conn_t *persist_conn) */
114 /* { */
115 /* xassert(persist_conn); */
116 /* _close_fd(&persist_conn->fd); */
117 /* slurm_persist_conn_open(persist_conn); */
118 /* } */
119
120 /* Wait until a file is readable,
121 * RET false if can not be read */
_conn_readable(slurm_persist_conn_t * persist_conn)122 static bool _conn_readable(slurm_persist_conn_t *persist_conn)
123 {
124 struct pollfd ufds;
125 int rc, time_left;
126
127 xassert(persist_conn->shutdown);
128
129 ufds.fd = persist_conn->fd;
130 ufds.events = POLLIN;
131 while (!(*persist_conn->shutdown)) {
132 if (persist_conn->timeout) {
133 struct timeval tstart;
134 gettimeofday(&tstart, NULL);
135 time_left = persist_conn->timeout - _tot_wait(&tstart);
136 } else
137 time_left = -1;
138 rc = poll(&ufds, 1, time_left);
139 if (*persist_conn->shutdown)
140 break;
141 if (rc == -1) {
142 if ((errno == EINTR) || (errno == EAGAIN)) {
143 debug3("%s: retrying poll for fd %d: %m",
144 __func__, persist_conn->fd);
145 continue;
146 }
147 error("%s: poll error for fd %d: %m",
148 __func__, persist_conn->fd);
149 return false;
150 }
151 if (rc == 0) {
152 debug("%s: poll for fd %d timeout after %d msecs of total wait %d msecs.",
153 __func__, persist_conn->fd, time_left,
154 persist_conn->timeout);
155 return false;
156 }
157 if ((ufds.revents & POLLHUP) &&
158 ((ufds.revents & POLLIN) == 0)) {
159 debug2("%s: persistent connection for fd %d closed",
160 __func__, persist_conn->fd);
161 return false;
162 }
163 if (ufds.revents & POLLNVAL) {
164 error("%s: persistent connection for fd %d is invalid",
165 __func__, persist_conn->fd);
166 return false;
167 }
168 if (ufds.revents & POLLERR) {
169 error("%s: persistent connection for fd %d experienced an error",
170 __func__, persist_conn->fd);
171 return false;
172 }
173 if ((ufds.revents & POLLIN) == 0) {
174 error("%s: persistent connection for fd %d missing POLLIN flag with revents 0x%"PRIx64,
175 __func__, persist_conn->fd, (uint64_t) ufds.revents);
176 return false;
177 }
178 if (ufds.revents == POLLIN) {
179 errno = 0;
180 return true;
181 }
182
183 fatal_abort("%s: poll returned unexpected revents: 0x%"PRIx64,
184 __func__, (uint64_t) ufds.revents);
185 }
186
187 debug("%s: shutdown request detected for fd %d",
188 __func__, persist_conn->fd);
189 return false;
190 }
191
_destroy_persist_service(persist_service_conn_t * persist_service)192 static void _destroy_persist_service(persist_service_conn_t *persist_service)
193 {
194 if (persist_service) {
195 slurm_persist_conn_destroy(persist_service->conn);
196 xfree(persist_service);
197 }
198 }
199
_sig_handler(int signal)200 static void _sig_handler(int signal)
201 {
202 }
203
_persist_free_msg_members(slurm_persist_conn_t * persist_conn,persist_msg_t * persist_msg)204 static void _persist_free_msg_members(slurm_persist_conn_t *persist_conn,
205 persist_msg_t *persist_msg)
206 {
207 if (persist_conn->flags & PERSIST_FLAG_DBD)
208 slurmdbd_free_msg(persist_msg);
209 else
210 slurm_free_msg_data(persist_msg->msg_type, persist_msg->data);
211 }
212
_process_service_connection(slurm_persist_conn_t * persist_conn,void * arg)213 static int _process_service_connection(
214 slurm_persist_conn_t *persist_conn, void *arg)
215 {
216 uint32_t nw_size = 0, msg_size = 0, uid = NO_VAL;
217 char *msg_char = NULL;
218 ssize_t msg_read = 0, offset = 0;
219 bool first = true, fini = false;
220 Buf buffer = NULL;
221 int rc = SLURM_SUCCESS;
222
223 xassert(persist_conn->callback_proc);
224 xassert(persist_conn->shutdown);
225
226 debug2("Opened connection %d from %s", persist_conn->fd,
227 persist_conn->rem_host);
228
229 if (persist_conn->flags & PERSIST_FLAG_ALREADY_INITED)
230 first = false;
231
232 while (!(*persist_conn->shutdown) && !fini) {
233 if (!_conn_readable(persist_conn))
234 break; /* problem with this socket */
235 msg_read = read(persist_conn->fd, &nw_size, sizeof(nw_size));
236 if (msg_read == 0) /* EOF */
237 break;
238 if (msg_read != sizeof(nw_size)) {
239 error("Could not read msg_size from "
240 "connection %d(%s) uid(%d)",
241 persist_conn->fd, persist_conn->rem_host, uid);
242 break;
243 }
244 msg_size = ntohl(nw_size);
245 if ((msg_size < 2) || (msg_size > MAX_MSG_SIZE)) {
246 error("Invalid msg_size (%u) from "
247 "connection %d(%s) uid(%d)",
248 msg_size, persist_conn->fd,
249 persist_conn->rem_host, uid);
250 break;
251 }
252
253 msg_char = xmalloc(msg_size);
254 offset = 0;
255 while (msg_size > offset) {
256 if (!_conn_readable(persist_conn))
257 break; /* problem with this socket */
258 msg_read = read(persist_conn->fd, (msg_char + offset),
259 (msg_size - offset));
260 if (msg_read <= 0) {
261 error("read(%d): %m", persist_conn->fd);
262 break;
263 }
264 offset += msg_read;
265 }
266 if (msg_size == offset) {
267 persist_msg_t msg;
268
269 rc = slurm_persist_conn_process_msg(
270 persist_conn, &msg,
271 msg_char, msg_size,
272 &buffer, first);
273
274 if (rc == SLURM_SUCCESS) {
275 rc = (persist_conn->callback_proc)(
276 arg, &msg, &buffer, &uid);
277 _persist_free_msg_members(persist_conn, &msg);
278 if (rc != SLURM_SUCCESS &&
279 rc != ACCOUNTING_FIRST_REG &&
280 rc != ACCOUNTING_TRES_CHANGE_DB &&
281 rc != ACCOUNTING_NODES_CHANGE_DB) {
282 error("Processing last message from "
283 "connection %d(%s) uid(%d)",
284 persist_conn->fd,
285 persist_conn->rem_host, uid);
286 if (rc == ESLURM_ACCESS_DENIED ||
287 rc == SLURM_PROTOCOL_VERSION_ERROR)
288 fini = true;
289 }
290 }
291 first = false;
292 } else {
293 buffer = slurm_persist_make_rc_msg(
294 persist_conn, SLURM_ERROR, "Bad offset", 0);
295 fini = true;
296 }
297
298 xfree(msg_char);
299 if (buffer) {
300 if (slurm_persist_send_msg(persist_conn, buffer)
301 != SLURM_SUCCESS) {
302 /* This is only an issue on persistent
303 * connections, and really isn't that big of a
304 * deal as the slurmctld will just send the
305 * message again. */
306 if (persist_conn->rem_port)
307 debug("Problem sending response to "
308 "connection %d(%s) uid(%d)",
309 persist_conn->fd,
310 persist_conn->rem_host, uid);
311 fini = true;
312 }
313 free_buf(buffer);
314 }
315 }
316
317 debug2("Closed connection %d uid(%d)", persist_conn->fd, uid);
318
319 return rc;
320 }
321
_service_connection(void * arg)322 static void *_service_connection(void *arg)
323 {
324 persist_service_conn_t *service_conn = arg;
325
326 xassert(service_conn);
327 xassert(service_conn->conn);
328
329 #if HAVE_SYS_PRCTL_H
330 char *name = xstrdup_printf("p-%s",
331 service_conn->conn->cluster_name);
332 if (prctl(PR_SET_NAME, name, NULL, NULL, NULL) < 0) {
333 error("%s: cannot set my name to %s %m", __func__, name);
334 }
335 xfree(name);
336 #endif
337
338 service_conn->thread_id = pthread_self();
339
340 _process_service_connection(service_conn->conn, service_conn->arg);
341
342 if (service_conn->conn->callback_fini)
343 (service_conn->conn->callback_fini)(service_conn->arg);
344 else
345 debug("Persist connection from cluster %s has disconnected",
346 service_conn->conn->cluster_name);
347
348 /* service_conn is freed inside here */
349 slurm_persist_conn_free_thread_loc(service_conn->thread_loc);
350 // xfree(service_conn);
351
352 /* In order to avoid zombie threads, detach the thread now before
353 * exiting. slurm_persist_conn_recv_server_fini() will not try to join
354 * the thread because slurm_persist_conn_free_thread_loc() will have
355 * free'd the connection. If their are threads at shutdown, the join
356 * will happen before the detach so recv_fini() will wait until the
357 * thread is done.
358 *
359 * pthread_join man page:
360 * Failure to join with a thread that is joinable (i.e., one that is not
361 * detached), produces a "zombie thread". Avoid doing this, since each
362 * zombie thread consumes some system resources, and when enough zombie
363 * threads have accumulated, it will no longer be possible to create new
364 * threads (or processes).
365 */
366 pthread_detach(pthread_self());
367
368 return NULL;
369 }
370
slurm_persist_conn_recv_server_init(void)371 extern void slurm_persist_conn_recv_server_init(void)
372 {
373 int sigarray[] = {SIGUSR1, 0};
374
375 shutdown_time = 0;
376
377 (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
378 (void) pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
379
380 /* Prepare to catch SIGUSR1 to interrupt accept().
381 * This signal is generated by the slurmdbd signal
382 * handler thread upon receipt of SIGABRT, SIGINT,
383 * or SIGTERM. That thread does all processing of
384 * all signals. */
385 xsignal(SIGUSR1, _sig_handler);
386 xsignal_unblock(sigarray);
387 }
388
slurm_persist_conn_recv_server_fini(void)389 extern void slurm_persist_conn_recv_server_fini(void)
390 {
391 int i;
392
393 shutdown_time = time(NULL);
394 slurm_mutex_lock(&thread_count_lock);
395 for (i=0; i<MAX_THREAD_COUNT; i++) {
396 if (!persist_service_conn[i])
397 continue;
398 if (persist_service_conn[i]->thread_id)
399 pthread_kill(persist_service_conn[i]->thread_id,
400 SIGUSR1);
401 }
402 /* It is faster to signal then wait since the threads would end serially
403 * instead of parallel if you did it all in one loop.
404 */
405 for (i=0; i<MAX_THREAD_COUNT; i++) {
406 if (!persist_service_conn[i])
407 continue;
408 if (persist_service_conn[i]->thread_id) {
409 pthread_t thread_id =
410 persist_service_conn[i]->thread_id;
411
412 /* Let go of lock in case the persistent connection
413 * thread is cleaning itself up.
414 * slurm_persist_conn_free_thread_loc() may be trying to
415 * remove itself but could be waiting on the
416 * thread_count mutex which this has locked. */
417 slurm_mutex_unlock(&thread_count_lock);
418 pthread_join(thread_id, NULL);
419 slurm_mutex_lock(&thread_count_lock);
420 }
421 _destroy_persist_service(persist_service_conn[i]);
422 persist_service_conn[i] = NULL;
423 }
424 slurm_mutex_unlock(&thread_count_lock);
425 }
426
slurm_persist_conn_recv_thread_init(slurm_persist_conn_t * persist_conn,int thread_loc,void * arg)427 extern void slurm_persist_conn_recv_thread_init(slurm_persist_conn_t *persist_conn,
428 int thread_loc, void *arg)
429 {
430 persist_service_conn_t *service_conn;
431
432 if (thread_loc < 0)
433 thread_loc = slurm_persist_conn_wait_for_thread_loc();
434 if (thread_loc < 0)
435 return;
436
437 service_conn = xmalloc(sizeof(persist_service_conn_t));
438
439 slurm_mutex_lock(&thread_count_lock);
440 persist_service_conn[thread_loc] = service_conn;
441 slurm_mutex_unlock(&thread_count_lock);
442
443 service_conn->arg = arg;
444 service_conn->conn = persist_conn;
445 service_conn->thread_loc = thread_loc;
446
447 persist_conn->timeout = 0; /* If this isn't zero we won't wait forever
448 like we want to.
449 */
450
451 //_service_connection(service_conn);
452 slurm_thread_create(&persist_service_conn[thread_loc]->thread_id,
453 _service_connection, service_conn);
454 }
455
456 /* Increment thread_count and don't return until its value is no larger
457 * than MAX_THREAD_COUNT,
458 * RET index of free index in persist_service_conn or -1 to exit */
slurm_persist_conn_wait_for_thread_loc(void)459 extern int slurm_persist_conn_wait_for_thread_loc(void)
460 {
461 bool print_it = true;
462 int i, rc = -1;
463
464 slurm_mutex_lock(&thread_count_lock);
465 while (1) {
466 if (shutdown_time)
467 break;
468
469 if (thread_count < MAX_THREAD_COUNT) {
470 thread_count++;
471 for (i=0; i<MAX_THREAD_COUNT; i++) {
472 if (persist_service_conn[i])
473 continue;
474 rc = i;
475 break;
476 }
477 if (rc == -1) {
478 /* thread_count and persist_thread_id
479 * out of sync */
480 fatal("No free persist_thread_id");
481 }
482 break;
483 } else {
484 /* wait for state change and retry,
485 * just a delay and not an error.
486 * This can happen when the epilog completes
487 * on a bunch of nodes at the same time, which
488 * can easily happen for highly parallel jobs. */
489 if (print_it) {
490 static time_t last_print_time = 0;
491 time_t now = time(NULL);
492 if (difftime(now, last_print_time) > 2) {
493 verbose("thread_count over "
494 "limit (%d), waiting",
495 thread_count);
496 last_print_time = now;
497 }
498 print_it = false;
499 }
500 slurm_cond_wait(&thread_count_cond, &thread_count_lock);
501 }
502 }
503 slurm_mutex_unlock(&thread_count_lock);
504 return rc;
505 }
506
507 /* my_tid IN - Thread ID of spawned thread, 0 if no thread spawned */
slurm_persist_conn_free_thread_loc(int thread_loc)508 extern void slurm_persist_conn_free_thread_loc(int thread_loc)
509 {
510 /* we will handle this in the fini */
511 if (shutdown_time)
512 return;
513
514 slurm_mutex_lock(&thread_count_lock);
515 if (thread_count > 0)
516 thread_count--;
517 else
518 error("thread_count underflow");
519
520 _destroy_persist_service(persist_service_conn[thread_loc]);
521 persist_service_conn[thread_loc] = NULL;
522
523 slurm_cond_broadcast(&thread_count_cond);
524 slurm_mutex_unlock(&thread_count_lock);
525 }
526
slurm_persist_conn_open_without_init(slurm_persist_conn_t * persist_conn)527 extern int slurm_persist_conn_open_without_init(
528 slurm_persist_conn_t *persist_conn)
529 {
530 slurm_addr_t addr;
531
532 xassert(persist_conn);
533 xassert(persist_conn->rem_host);
534 xassert(persist_conn->rem_port);
535 xassert(persist_conn->cluster_name);
536
537 if (persist_conn->fd > 0)
538 _close_fd(&persist_conn->fd);
539 else
540 persist_conn->fd = -1;
541
542 if (!persist_conn->inited)
543 persist_conn->inited = true;
544
545 if (!persist_conn->version) {
546 /* Set to MIN_PROTOCOL so that a higher version controller can
547 * talk to a lower protocol version controller. When talking to
548 * the DBD, the protocol version should be set to the current
549 * protocol version prior to calling this. */
550 persist_conn->version = SLURM_MIN_PROTOCOL_VERSION;
551 }
552 if (persist_conn->timeout < 0)
553 persist_conn->timeout = slurm_get_msg_timeout() * 1000;
554
555 slurm_set_addr_char(&addr, persist_conn->rem_port,
556 persist_conn->rem_host);
557 if ((persist_conn->fd = slurm_open_msg_conn(&addr)) < 0) {
558 if (_comm_fail_log(persist_conn)) {
559 char *s = xstrdup_printf("%s: failed to open persistent connection to %s:%d: %m",
560 __func__,
561 persist_conn->rem_host,
562 persist_conn->rem_port);
563 if (persist_conn->flags & PERSIST_FLAG_SUPPRESS_ERR)
564 debug2("%s", s);
565 else
566 error("%s", s);
567 xfree(s);
568 }
569 return SLURM_ERROR;
570 }
571 fd_set_nonblocking(persist_conn->fd);
572 fd_set_close_on_exec(persist_conn->fd);
573
574 return SLURM_SUCCESS;
575 }
576
577
578 /* Open a persistent socket connection
579 * IN/OUT - persistent connection needing rem_host and rem_port filled in.
580 * Returned completely filled in.
581 * Returns SLURM_SUCCESS on success or SLURM_ERROR on failure */
slurm_persist_conn_open(slurm_persist_conn_t * persist_conn)582 extern int slurm_persist_conn_open(slurm_persist_conn_t *persist_conn)
583 {
584 int rc = SLURM_ERROR;
585 slurm_msg_t req_msg;
586 persist_init_req_msg_t req;
587 persist_rc_msg_t *resp = NULL;
588
589 if (slurm_persist_conn_open_without_init(persist_conn) != SLURM_SUCCESS)
590 return rc;
591
592 slurm_msg_t_init(&req_msg);
593
594 /* Always send the lowest protocol since we don't know what version the
595 * other side is running yet.
596 */
597 req_msg.protocol_version = persist_conn->version;
598 req_msg.msg_type = REQUEST_PERSIST_INIT;
599
600 req_msg.flags |= SLURM_GLOBAL_AUTH_KEY;
601 if (persist_conn->flags & PERSIST_FLAG_DBD)
602 req_msg.flags |= SLURMDBD_CONNECTION;
603
604 memset(&req, 0, sizeof(persist_init_req_msg_t));
605 req.cluster_name = persist_conn->cluster_name;
606 req.persist_type = persist_conn->persist_type;
607 req.port = persist_conn->my_port;
608 req.version = SLURM_PROTOCOL_VERSION;
609
610 req_msg.data = &req;
611
612 if (slurm_send_node_msg(persist_conn->fd, &req_msg) < 0) {
613 error("%s: failed to send persistent connection init message to %s:%d",
614 __func__, persist_conn->rem_host, persist_conn->rem_port);
615 _close_fd(&persist_conn->fd);
616 } else {
617 Buf buffer = slurm_persist_recv_msg(persist_conn);
618 persist_msg_t msg;
619 slurm_persist_conn_t persist_conn_tmp;
620
621 if (!buffer) {
622 if (_comm_fail_log(persist_conn)) {
623 error("%s: No response to persist_init",
624 __func__);
625 }
626 _close_fd(&persist_conn->fd);
627 goto end_it;
628 }
629 memset(&msg, 0, sizeof(persist_msg_t));
630 memcpy(&persist_conn_tmp, persist_conn,
631 sizeof(slurm_persist_conn_t));
632 /* The first unpack is done the same way for dbd or normal
633 * communication . */
634 persist_conn_tmp.flags &= (~PERSIST_FLAG_DBD);
635 rc = slurm_persist_msg_unpack(&persist_conn_tmp, &msg, buffer);
636 free_buf(buffer);
637
638 resp = (persist_rc_msg_t *)msg.data;
639 if (resp && (rc == SLURM_SUCCESS)) {
640 rc = resp->rc;
641 persist_conn->version = resp->ret_info;
642 persist_conn->flags |= resp->flags;
643 }
644
645 if (rc != SLURM_SUCCESS) {
646 if (resp) {
647 error("%s: Something happened with the receiving/processing of the persistent connection init message to %s:%d: %s",
648 __func__, persist_conn->rem_host,
649 persist_conn->rem_port, resp->comment);
650 } else {
651 error("%s: Failed to unpack persistent connection init resp message from %s:%d",
652 __func__,
653 persist_conn->rem_host,
654 persist_conn->rem_port);
655 }
656 _close_fd(&persist_conn->fd);
657 }
658 }
659
660 end_it:
661
662 slurm_persist_free_rc_msg(resp);
663
664 return rc;
665 }
666
slurm_persist_conn_close(slurm_persist_conn_t * persist_conn)667 extern void slurm_persist_conn_close(slurm_persist_conn_t *persist_conn)
668 {
669 if (!persist_conn)
670 return;
671
672 _close_fd(&persist_conn->fd);
673 }
674
slurm_persist_conn_reopen(slurm_persist_conn_t * persist_conn,bool with_init)675 extern int slurm_persist_conn_reopen(slurm_persist_conn_t *persist_conn,
676 bool with_init)
677 {
678 slurm_persist_conn_close(persist_conn);
679
680 if (with_init)
681 return slurm_persist_conn_open(persist_conn);
682 else
683 return slurm_persist_conn_open_without_init(persist_conn);
684 }
685
686 /* Close the persistent connection */
slurm_persist_conn_members_destroy(slurm_persist_conn_t * persist_conn)687 extern void slurm_persist_conn_members_destroy(
688 slurm_persist_conn_t *persist_conn)
689 {
690 if (!persist_conn)
691 return;
692
693 persist_conn->inited = false;
694 slurm_persist_conn_close(persist_conn);
695
696 if (persist_conn->auth_cred) {
697 g_slurm_auth_destroy(persist_conn->auth_cred);
698 persist_conn->auth_cred = NULL;
699 }
700 xfree(persist_conn->cluster_name);
701 xfree(persist_conn->rem_host);
702 }
703
704 /* Close the persistent connection */
slurm_persist_conn_destroy(slurm_persist_conn_t * persist_conn)705 extern void slurm_persist_conn_destroy(slurm_persist_conn_t *persist_conn)
706 {
707 if (!persist_conn)
708 return;
709 slurm_persist_conn_members_destroy(persist_conn);
710 xfree(persist_conn);
711 }
712
slurm_persist_conn_process_msg(slurm_persist_conn_t * persist_conn,persist_msg_t * persist_msg,char * msg_char,uint32_t msg_size,Buf * out_buffer,bool first)713 extern int slurm_persist_conn_process_msg(slurm_persist_conn_t *persist_conn,
714 persist_msg_t *persist_msg,
715 char *msg_char, uint32_t msg_size,
716 Buf *out_buffer, bool first)
717 {
718 int rc;
719 Buf recv_buffer = NULL;
720 char *comment = NULL;
721
722 /* puts msg_char into buffer struct */
723 recv_buffer = create_buf(msg_char, msg_size);
724
725 memset(persist_msg, 0, sizeof(persist_msg_t));
726 rc = slurm_persist_msg_unpack(persist_conn, persist_msg, recv_buffer);
727 xfer_buf_data(recv_buffer); /* delete in_buffer struct
728 * without xfree of msg_char
729 * (done later in this
730 * function). */
731 if (rc != SLURM_SUCCESS) {
732 comment = xstrdup_printf("Failed to unpack %s message",
733 slurmdbd_msg_type_2_str(
734 persist_msg->msg_type, true));
735 error("CONN:%u %s", persist_conn->fd, comment);
736 *out_buffer = slurm_persist_make_rc_msg(
737 persist_conn, rc, comment, persist_msg->msg_type);
738 xfree(comment);
739 } else if (first &&
740 (persist_msg->msg_type != REQUEST_PERSIST_INIT)) {
741 comment = "Initial RPC not REQUEST_PERSIST_INIT";
742 error("CONN:%u %s type (%d)",
743 persist_conn->fd, comment, persist_msg->msg_type);
744 rc = EINVAL;
745 *out_buffer = slurm_persist_make_rc_msg(
746 persist_conn, rc, comment,
747 REQUEST_PERSIST_INIT);
748 } else if (!first &&
749 (persist_msg->msg_type == REQUEST_PERSIST_INIT)) {
750 comment = "REQUEST_PERSIST_INIT sent after connection established";
751 error("CONN:%u %s", persist_conn->fd, comment);
752 rc = EINVAL;
753 *out_buffer = slurm_persist_make_rc_msg(
754 persist_conn, rc, comment, REQUEST_PERSIST_INIT);
755 }
756
757 return rc;
758 }
759
760 /* Wait until a file is writeable,
761 * RET 1 if file can be written now,
762 * 0 if can not be written to within 5 seconds
763 * -1 if file has been closed POLLHUP
764 */
slurm_persist_conn_writeable(slurm_persist_conn_t * persist_conn)765 extern int slurm_persist_conn_writeable(slurm_persist_conn_t *persist_conn)
766 {
767 struct pollfd ufds;
768 int write_timeout = 5000;
769 int rc, time_left;
770 struct timeval tstart;
771 char temp[2];
772
773 xassert(persist_conn->shutdown);
774
775 if (persist_conn->fd < 0)
776 return -1;
777
778 ufds.fd = persist_conn->fd;
779 ufds.events = POLLOUT;
780 gettimeofday(&tstart, NULL);
781 while ((*persist_conn->shutdown) == 0) {
782 time_left = write_timeout - _tot_wait(&tstart);
783 rc = poll(&ufds, 1, time_left);
784 if (rc == -1) {
785 if ((errno == EINTR) || (errno == EAGAIN))
786 continue;
787 error("%s: poll error: %m", __func__);
788 return -1;
789 }
790 if (rc == 0)
791 return 0;
792 /*
793 * Check here to make sure the socket really is there.
794 * If not then exit out and notify the conn. This
795 * is here since a write doesn't always tell you the
796 * socket is gone, but getting 0 back from a
797 * nonblocking read means just that.
798 */
799 if (ufds.revents & POLLHUP ||
800 (recv(persist_conn->fd, &temp, 1, 0) == 0)) {
801 debug2("%s: persistent connection %d is closed for writes",
802 __func__, persist_conn->fd);
803 if (persist_conn->trigger_callbacks.dbd_fail)
804 (persist_conn->trigger_callbacks.dbd_fail)();
805 return -1;
806 }
807 if (ufds.revents & POLLNVAL) {
808 error("%s: persistent connection %d is invalid",
809 __func__, persist_conn->fd);
810 return 0;
811 }
812 if (ufds.revents & POLLERR) {
813 if (_comm_fail_log(persist_conn)) {
814 if (fd_get_socket_error(persist_conn->fd, &errno))
815 error("%s: unable to get error for persistent connection %d: %m",
816 __func__, persist_conn->fd);
817 else
818 error("%s: persistent connection %d experienced an error: %m",
819 __func__, persist_conn->fd);
820 }
821 if (persist_conn->trigger_callbacks.dbd_fail)
822 (persist_conn->trigger_callbacks.dbd_fail)();
823 return 0;
824 }
825 if ((ufds.revents & POLLOUT) == 0) {
826 error("%s: persistent connection %d events %d",
827 __func__, persist_conn->fd, ufds.revents);
828 return 0;
829 }
830 /* revents == POLLOUT */
831 errno = 0;
832 return 1;
833 }
834 return 0;
835 }
836
slurm_persist_send_msg(slurm_persist_conn_t * persist_conn,Buf buffer)837 extern int slurm_persist_send_msg(
838 slurm_persist_conn_t *persist_conn, Buf buffer)
839 {
840 uint32_t msg_size, nw_size;
841 char *msg;
842 ssize_t msg_wrote;
843 int rc, retry_cnt = 0;
844
845 xassert(persist_conn);
846
847 if (persist_conn->fd < 0)
848 return EAGAIN;
849
850 if (!buffer)
851 return SLURM_ERROR;
852
853 rc = slurm_persist_conn_writeable(persist_conn);
854 if (rc == -1) {
855 re_open:
856 /* if errno is ACCESS_DENIED do not try to reopen to
857 connection just return that */
858 if (errno == ESLURM_ACCESS_DENIED)
859 return ESLURM_ACCESS_DENIED;
860
861 if (retry_cnt++ > 3)
862 return SLURM_COMMUNICATIONS_SEND_ERROR;
863
864 if (persist_conn->flags & PERSIST_FLAG_RECONNECT) {
865 slurm_persist_conn_reopen(persist_conn, true);
866 rc = slurm_persist_conn_writeable(persist_conn);
867 } else
868 return SLURM_ERROR;
869 }
870 if (rc < 1)
871 return EAGAIN;
872
873 msg_size = get_buf_offset(buffer);
874 nw_size = htonl(msg_size);
875 msg_wrote = write(persist_conn->fd, &nw_size, sizeof(nw_size));
876 if (msg_wrote != sizeof(nw_size))
877 return EAGAIN;
878
879 msg = get_buf_data(buffer);
880 while (msg_size > 0) {
881 rc = slurm_persist_conn_writeable(persist_conn);
882 if (rc == -1)
883 goto re_open;
884 if (rc < 1)
885 return EAGAIN;
886 msg_wrote = write(persist_conn->fd, msg, msg_size);
887 if (msg_wrote <= 0)
888 return EAGAIN;
889 msg += msg_wrote;
890 msg_size -= msg_wrote;
891 }
892
893 return SLURM_SUCCESS;
894 }
895
slurm_persist_recv_msg(slurm_persist_conn_t * persist_conn)896 extern Buf slurm_persist_recv_msg(slurm_persist_conn_t *persist_conn)
897 {
898 uint32_t msg_size, nw_size;
899 char *msg;
900 ssize_t msg_read, offset;
901 Buf buffer;
902
903 xassert(persist_conn);
904
905 if (persist_conn->fd < 0)
906 return NULL;
907
908 if (!_conn_readable(persist_conn))
909 goto endit;
910
911 msg_read = read(persist_conn->fd, &nw_size, sizeof(nw_size));
912 if (msg_read != sizeof(nw_size))
913 goto endit;
914 msg_size = ntohl(nw_size);
915 /* We don't error check for an upper limit here
916 * since size could possibly be massive */
917 if (msg_size < 2) {
918 error("Persistent Conn: Invalid msg_size (%u)", msg_size);
919 goto endit;
920 }
921
922 msg = xmalloc(msg_size);
923 offset = 0;
924 while (msg_size > offset) {
925 if (!_conn_readable(persist_conn))
926 break; /* problem with this socket */
927 msg_read = read(persist_conn->fd, (msg + offset),
928 (msg_size - offset));
929 if (msg_read <= 0) {
930 error("Persistent Conn: read: %m");
931 break;
932 }
933 offset += msg_read;
934 }
935 if (msg_size != offset) {
936 if (!(*persist_conn->shutdown)) {
937 error("Persistent Conn: only read %zd of %d bytes",
938 offset, msg_size);
939 } /* else in shutdown mode */
940 xfree(msg);
941 goto endit;
942 }
943
944 buffer = create_buf(msg, msg_size);
945 return buffer;
946
947 endit:
948 /* Close it since we abandoned it. If the connection does still exist
949 * on the other end we can't rely on it after this point since we didn't
950 * listen long enough for this response.
951 */
952 if (!(*persist_conn->shutdown) &&
953 persist_conn->flags & PERSIST_FLAG_RECONNECT)
954 slurm_persist_conn_reopen(persist_conn, true);
955
956 return NULL;
957 }
958
slurm_persist_msg_pack(slurm_persist_conn_t * persist_conn,persist_msg_t * req_msg)959 extern Buf slurm_persist_msg_pack(slurm_persist_conn_t *persist_conn,
960 persist_msg_t *req_msg)
961 {
962 Buf buffer;
963
964 xassert(persist_conn);
965
966 if (persist_conn->flags & PERSIST_FLAG_DBD)
967 buffer = pack_slurmdbd_msg(req_msg, persist_conn->version);
968 else {
969 slurm_msg_t msg;
970
971 slurm_msg_t_init(&msg);
972
973 msg.data = req_msg->data;
974 msg.data_size = req_msg->data_size;
975 msg.msg_type = req_msg->msg_type;
976 msg.protocol_version = persist_conn->version;
977
978 buffer = init_buf(BUF_SIZE);
979
980 pack16(req_msg->msg_type, buffer);
981 if (pack_msg(&msg, buffer) != SLURM_SUCCESS) {
982 free_buf(buffer);
983 return NULL;
984 }
985 }
986
987 return buffer;
988 }
989
990
slurm_persist_msg_unpack(slurm_persist_conn_t * persist_conn,persist_msg_t * resp_msg,Buf buffer)991 extern int slurm_persist_msg_unpack(slurm_persist_conn_t *persist_conn,
992 persist_msg_t *resp_msg, Buf buffer)
993 {
994 int rc;
995
996 xassert(persist_conn);
997 xassert(resp_msg);
998
999 if (persist_conn->flags & PERSIST_FLAG_DBD) {
1000 rc = unpack_slurmdbd_msg(resp_msg,
1001 persist_conn->version,
1002 buffer);
1003 } else {
1004 slurm_msg_t msg;
1005
1006 slurm_msg_t_init(&msg);
1007
1008 msg.protocol_version = persist_conn->version;
1009
1010 safe_unpack16(&msg.msg_type, buffer);
1011
1012 rc = unpack_msg(&msg, buffer);
1013
1014 resp_msg->msg_type = msg.msg_type;
1015 resp_msg->data = msg.data;
1016 }
1017
1018 /* Here we transfer the auth_cred to the persist_conn just in case in the
1019 * future we need to use it in some way to verify things for messages
1020 * that don't have on that will follow on the connection.
1021 */
1022 if (resp_msg->msg_type == REQUEST_PERSIST_INIT) {
1023 slurm_msg_t *msg = resp_msg->data;
1024 if (persist_conn->auth_cred)
1025 g_slurm_auth_destroy(persist_conn->auth_cred);
1026
1027 persist_conn->auth_cred = msg->auth_cred;
1028 msg->auth_cred = NULL;
1029 }
1030
1031 return rc;
1032 unpack_error:
1033 return SLURM_ERROR;
1034 }
1035
slurm_persist_pack_init_req_msg(persist_init_req_msg_t * msg,Buf buffer)1036 extern void slurm_persist_pack_init_req_msg(
1037 persist_init_req_msg_t *msg, Buf buffer)
1038 {
1039 /* always send version field first for backwards compatibility */
1040 pack16(msg->version, buffer);
1041
1042 if (msg->version >= SLURM_MIN_PROTOCOL_VERSION) {
1043 packstr(msg->cluster_name, buffer);
1044 pack16(msg->persist_type, buffer);
1045 pack16(msg->port, buffer);
1046 } else {
1047 error("%s: invalid protocol version %u",
1048 __func__, msg->version);
1049 }
1050 }
1051
slurm_persist_unpack_init_req_msg(persist_init_req_msg_t ** msg,Buf buffer)1052 extern int slurm_persist_unpack_init_req_msg(
1053 persist_init_req_msg_t **msg, Buf buffer)
1054 {
1055 uint32_t tmp32;
1056
1057 persist_init_req_msg_t *msg_ptr =
1058 xmalloc(sizeof(persist_init_req_msg_t));
1059
1060 *msg = msg_ptr;
1061
1062 safe_unpack16(&msg_ptr->version, buffer);
1063
1064 if (msg_ptr->version >= SLURM_MIN_PROTOCOL_VERSION) {
1065 safe_unpackstr_xmalloc(&msg_ptr->cluster_name, &tmp32, buffer);
1066 safe_unpack16(&msg_ptr->persist_type, buffer);
1067 safe_unpack16(&msg_ptr->port, buffer);
1068 } else {
1069 error("%s: invalid protocol_version %u",
1070 __func__, msg_ptr->version);
1071 goto unpack_error;
1072 }
1073
1074 return SLURM_SUCCESS;
1075
1076 unpack_error:
1077 slurm_persist_free_init_req_msg(msg_ptr);
1078 *msg = NULL;
1079 return SLURM_ERROR;
1080 }
1081
slurm_persist_free_init_req_msg(persist_init_req_msg_t * msg)1082 extern void slurm_persist_free_init_req_msg(persist_init_req_msg_t *msg)
1083 {
1084 if (msg) {
1085 xfree(msg->cluster_name);
1086 xfree(msg);
1087 }
1088 }
1089
slurm_persist_pack_rc_msg(persist_rc_msg_t * msg,Buf buffer,uint16_t protocol_version)1090 extern void slurm_persist_pack_rc_msg(
1091 persist_rc_msg_t *msg, Buf buffer, uint16_t protocol_version)
1092 {
1093 if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
1094 packstr(msg->comment, buffer);
1095 pack16(msg->flags, buffer);
1096 pack32(msg->rc, buffer);
1097 pack16(msg->ret_info, buffer);
1098 } else {
1099 error("%s: invalid protocol version %u",
1100 __func__, protocol_version);
1101 }
1102 }
1103
slurm_persist_unpack_rc_msg(persist_rc_msg_t ** msg,Buf buffer,uint16_t protocol_version)1104 extern int slurm_persist_unpack_rc_msg(
1105 persist_rc_msg_t **msg, Buf buffer, uint16_t protocol_version)
1106 {
1107 uint32_t uint32_tmp;
1108
1109 persist_rc_msg_t *msg_ptr = xmalloc(sizeof(persist_rc_msg_t));
1110
1111 *msg = msg_ptr;
1112
1113 if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
1114 safe_unpackstr_xmalloc(&msg_ptr->comment, &uint32_tmp, buffer);
1115 safe_unpack16(&msg_ptr->flags, buffer);
1116 safe_unpack32(&msg_ptr->rc, buffer);
1117 safe_unpack16(&msg_ptr->ret_info, buffer);
1118 } else {
1119 error("%s: invalid protocol_version %u",
1120 __func__, protocol_version);
1121 goto unpack_error;
1122 }
1123
1124 return SLURM_SUCCESS;
1125
1126 unpack_error:
1127 slurm_persist_free_rc_msg(msg_ptr);
1128 *msg = NULL;
1129 return SLURM_ERROR;
1130 }
1131
slurm_persist_free_rc_msg(persist_rc_msg_t * msg)1132 extern void slurm_persist_free_rc_msg(persist_rc_msg_t *msg)
1133 {
1134 if (msg) {
1135 xfree(msg->comment);
1136 xfree(msg);
1137 }
1138 }
1139
slurm_persist_make_rc_msg(slurm_persist_conn_t * persist_conn,uint32_t rc,char * comment,uint16_t ret_info)1140 extern Buf slurm_persist_make_rc_msg(slurm_persist_conn_t *persist_conn,
1141 uint32_t rc, char *comment,
1142 uint16_t ret_info)
1143 {
1144 persist_rc_msg_t msg;
1145 persist_msg_t resp;
1146
1147 memset(&msg, 0, sizeof(persist_rc_msg_t));
1148 memset(&resp, 0, sizeof(persist_msg_t));
1149
1150 msg.rc = rc;
1151 msg.comment = comment;
1152 msg.ret_info = ret_info;
1153
1154 resp.msg_type = PERSIST_RC;
1155 resp.data = &msg;
1156
1157 return slurm_persist_msg_pack(persist_conn, &resp);
1158 }
1159
slurm_persist_make_rc_msg_flags(slurm_persist_conn_t * persist_conn,uint32_t rc,char * comment,uint16_t flags,uint16_t ret_info)1160 extern Buf slurm_persist_make_rc_msg_flags(slurm_persist_conn_t *persist_conn,
1161 uint32_t rc, char *comment,
1162 uint16_t flags,
1163 uint16_t ret_info)
1164 {
1165 persist_rc_msg_t msg;
1166 persist_msg_t resp;
1167
1168 memset(&msg, 0, sizeof(persist_rc_msg_t));
1169 memset(&resp, 0, sizeof(persist_msg_t));
1170
1171 msg.rc = rc;
1172 msg.flags = flags;
1173 msg.comment = comment;
1174 msg.ret_info = ret_info;
1175
1176 resp.msg_type = PERSIST_RC;
1177 resp.data = &msg;
1178
1179 return slurm_persist_msg_pack(persist_conn, &resp);
1180 }
1181