1 /* $NetBSD: clvmd.c,v 1.2 2015/11/09 00:53:57 dholland Exp $ */
2
3 /*
4 * Copyright (C) 2002-2004 Sistina Software, Inc. All rights reserved.
5 * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
6 *
7 * This file is part of LVM2.
8 *
9 * This copyrighted material is made available to anyone wishing to use,
10 * modify, copy, or redistribute it subject to the terms and conditions
11 * of the GNU General Public License v.2.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this program; if not, write to the Free Software Foundation,
15 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
16 */
17
18 /*
19 * CLVMD: Cluster LVM daemon
20 */
21
22 #define _GNU_SOURCE
23 #define _FILE_OFFSET_BITS 64
24
25 #include <configure.h>
26 #include <libdevmapper.h>
27
28 #include <pthread.h>
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <sys/socket.h>
32 #include <sys/uio.h>
33 #include <sys/un.h>
34 #include <sys/time.h>
35 #include <sys/ioctl.h>
36 #include <sys/utsname.h>
37 #include <netinet/in.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <stddef.h>
41 #include <stdarg.h>
42 #include <signal.h>
43 #include <unistd.h>
44 #include <fcntl.h>
45 #include <getopt.h>
46 #include <syslog.h>
47 #include <errno.h>
48 #include <limits.h>
49 #ifdef HAVE_COROSYNC_CONFDB_H
50 #include <corosync/confdb.h>
51 #endif
52
53 #include "clvmd-comms.h"
54 #include "lvm-functions.h"
55 #include "clvm.h"
56 #include "lvm-version.h"
57 #include "clvmd.h"
58 #include "refresh_clvmd.h"
59 #include "lvm-logging.h"
60
61 #ifndef TRUE
62 #define TRUE 1
63 #endif
64 #ifndef FALSE
65 #define FALSE 0
66 #endif
67
68 #define MAX_RETRIES 4
69
70 #define ISLOCAL_CSID(c) (memcmp(c, our_csid, max_csid_len) == 0)
71
72 /* Head of the fd list. Also contains
73 the cluster_socket details */
74 static struct local_client local_client_head;
75
76 static unsigned short global_xid = 0; /* Last transaction ID issued */
77
78 struct cluster_ops *clops = NULL;
79
80 static char our_csid[MAX_CSID_LEN];
81 static unsigned max_csid_len;
82 static unsigned max_cluster_message;
83 static unsigned max_cluster_member_name_len;
84
85 /* Structure of items on the LVM thread list */
86 struct lvm_thread_cmd {
87 struct dm_list list;
88
89 struct local_client *client;
90 struct clvm_header *msg;
91 char csid[MAX_CSID_LEN];
92 int remote; /* Flag */
93 int msglen;
94 unsigned short xid;
95 };
96
97 debug_t debug;
98 static pthread_t lvm_thread;
99 static pthread_mutex_t lvm_thread_mutex;
100 static pthread_cond_t lvm_thread_cond;
101 static pthread_mutex_t lvm_start_mutex;
102 static struct dm_list lvm_cmd_head;
103 static volatile sig_atomic_t quit = 0;
104 static volatile sig_atomic_t reread_config = 0;
105 static int child_pipe[2];
106
107 /* Reasons the daemon failed initialisation */
108 #define DFAIL_INIT 1
109 #define DFAIL_LOCAL_SOCK 2
110 #define DFAIL_CLUSTER_IF 3
111 #define DFAIL_MALLOC 4
112 #define DFAIL_TIMEOUT 5
113 #define SUCCESS 0
114
115 typedef enum {IF_AUTO, IF_CMAN, IF_GULM, IF_OPENAIS, IF_COROSYNC} if_type_t;
116
117 typedef void *(lvm_pthread_fn_t)(void*);
118
119 /* Prototypes for code further down */
120 static void sigusr2_handler(int sig);
121 static void sighup_handler(int sig);
122 static void sigterm_handler(int sig);
123 static void send_local_reply(struct local_client *client, int status,
124 int clientid);
125 static void free_reply(struct local_client *client);
126 static void send_version_message(void);
127 static void *pre_and_post_thread(void *arg);
128 static int send_message(void *buf, int msglen, const char *csid, int fd,
129 const char *errtext);
130 static int read_from_local_sock(struct local_client *thisfd);
131 static int process_local_command(struct clvm_header *msg, int msglen,
132 struct local_client *client,
133 unsigned short xid);
134 static void process_remote_command(struct clvm_header *msg, int msglen, int fd,
135 const char *csid);
136 static int process_reply(const struct clvm_header *msg, int msglen,
137 const char *csid);
138 static int open_local_sock(void);
139 static int check_local_clvmd(void);
140 static struct local_client *find_client(int clientid);
141 static void main_loop(int local_sock, int cmd_timeout);
142 static void be_daemon(int start_timeout);
143 static int check_all_clvmds_running(struct local_client *client);
144 static int local_rendezvous_callback(struct local_client *thisfd, char *buf,
145 int len, const char *csid,
146 struct local_client **new_client);
147 static void lvm_thread_fn(void *) __attribute__ ((noreturn));
148 static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg,
149 int msglen, const char *csid);
150 static int distribute_command(struct local_client *thisfd);
151 static void hton_clvm(struct clvm_header *hdr);
152 static void ntoh_clvm(struct clvm_header *hdr);
153 static void add_reply_to_list(struct local_client *client, int status,
154 const char *csid, const char *buf, int len);
155 static if_type_t parse_cluster_interface(char *ifname);
156 static if_type_t get_cluster_type(void);
157
usage(char * prog,FILE * file)158 static void usage(char *prog, FILE *file)
159 {
160 fprintf(file, "Usage:\n");
161 fprintf(file, "%s [Vhd]\n", prog);
162 fprintf(file, "\n");
163 fprintf(file, " -V Show version of clvmd\n");
164 fprintf(file, " -h Show this help information\n");
165 fprintf(file, " -d Set debug level\n");
166 fprintf(file, " If starting clvmd then don't fork, run in the foreground\n");
167 fprintf(file, " -R Tell all running clvmds in the cluster to reload their device cache\n");
168 fprintf(file, " -C Sets debug level (from -d) on all clvmd instances clusterwide\n");
169 fprintf(file, " -t<secs> Command timeout (default 60 seconds)\n");
170 fprintf(file, " -T<secs> Startup timeout (default none)\n");
171 fprintf(file, " -I<cmgr> Cluster manager (default: auto)\n");
172 fprintf(file, " Available cluster managers: ");
173 #ifdef USE_COROSYNC
174 fprintf(file, "corosync ");
175 #endif
176 #ifdef USE_CMAN
177 fprintf(file, "cman ");
178 #endif
179 #ifdef USE_OPENAIS
180 fprintf(file, "openais ");
181 #endif
182 #ifdef USE_GULM
183 fprintf(file, "gulm ");
184 #endif
185 fprintf(file, "\n");
186 }
187
188 /* Called to signal the parent how well we got on during initialisation */
child_init_signal(int status)189 static void child_init_signal(int status)
190 {
191 if (child_pipe[1]) {
192 write(child_pipe[1], &status, sizeof(status));
193 close(child_pipe[1]);
194 }
195 if (status)
196 exit(status);
197 }
198
199
debuglog(const char * fmt,...)200 void debuglog(const char *fmt, ...)
201 {
202 time_t P;
203 va_list ap;
204 static int syslog_init = 0;
205
206 if (debug == DEBUG_STDERR) {
207 va_start(ap,fmt);
208 time(&P);
209 fprintf(stderr, "CLVMD[%x]: %.15s ", (int)pthread_self(), ctime(&P)+4 );
210 vfprintf(stderr, fmt, ap);
211 va_end(ap);
212 }
213 if (debug == DEBUG_SYSLOG) {
214 if (!syslog_init) {
215 openlog("clvmd", LOG_PID, LOG_DAEMON);
216 syslog_init = 1;
217 }
218
219 va_start(ap,fmt);
220 vsyslog(LOG_DEBUG, fmt, ap);
221 va_end(ap);
222 }
223 }
224
decode_cmd(unsigned char cmdl)225 static const char *decode_cmd(unsigned char cmdl)
226 {
227 static char buf[128];
228 const char *command;
229
230 switch (cmdl) {
231 case CLVMD_CMD_TEST:
232 command = "TEST";
233 break;
234 case CLVMD_CMD_LOCK_VG:
235 command = "LOCK_VG";
236 break;
237 case CLVMD_CMD_LOCK_LV:
238 command = "LOCK_LV";
239 break;
240 case CLVMD_CMD_REFRESH:
241 command = "REFRESH";
242 break;
243 case CLVMD_CMD_SET_DEBUG:
244 command = "SET_DEBUG";
245 break;
246 case CLVMD_CMD_GET_CLUSTERNAME:
247 command = "GET_CLUSTERNAME";
248 break;
249 case CLVMD_CMD_VG_BACKUP:
250 command = "VG_BACKUP";
251 break;
252 case CLVMD_CMD_REPLY:
253 command = "REPLY";
254 break;
255 case CLVMD_CMD_VERSION:
256 command = "VERSION";
257 break;
258 case CLVMD_CMD_GOAWAY:
259 command = "GOAWAY";
260 break;
261 case CLVMD_CMD_LOCK:
262 command = "LOCK";
263 break;
264 case CLVMD_CMD_UNLOCK:
265 command = "UNLOCK";
266 break;
267 case CLVMD_CMD_LOCK_QUERY:
268 command = "LOCK_QUERY";
269 break;
270 default:
271 command = "unknown";
272 break;
273 }
274
275 sprintf(buf, "%s (0x%x)", command, cmdl);
276
277 return buf;
278 }
279
main(int argc,char * argv[])280 int main(int argc, char *argv[])
281 {
282 int local_sock;
283 struct local_client *newfd;
284 struct utsname nodeinfo;
285 signed char opt;
286 int cmd_timeout = DEFAULT_CMD_TIMEOUT;
287 int start_timeout = 0;
288 if_type_t cluster_iface = IF_AUTO;
289 sigset_t ss;
290 int using_gulm = 0;
291 int debug_opt = 0;
292 int clusterwide_opt = 0;
293
294 /* Deal with command-line arguments */
295 opterr = 0;
296 optind = 0;
297 while ((opt = getopt(argc, argv, "?vVhd::t:RT:CI:")) != EOF) {
298 switch (opt) {
299 case 'h':
300 usage(argv[0], stdout);
301 exit(0);
302
303 case '?':
304 usage(argv[0], stderr);
305 exit(0);
306
307 case 'R':
308 return refresh_clvmd()==1?0:1;
309
310 case 'C':
311 clusterwide_opt = 1;
312 break;
313
314 case 'd':
315 debug_opt = 1;
316 if (optarg)
317 debug = atoi(optarg);
318 else
319 debug = DEBUG_STDERR;
320 break;
321
322 case 't':
323 cmd_timeout = atoi(optarg);
324 if (!cmd_timeout) {
325 fprintf(stderr, "command timeout is invalid\n");
326 usage(argv[0], stderr);
327 exit(1);
328 }
329 break;
330 case 'I':
331 cluster_iface = parse_cluster_interface(optarg);
332 break;
333 case 'T':
334 start_timeout = atoi(optarg);
335 if (start_timeout <= 0) {
336 fprintf(stderr, "startup timeout is invalid\n");
337 usage(argv[0], stderr);
338 exit(1);
339 }
340 break;
341
342 case 'V':
343 printf("Cluster LVM daemon version: %s\n", LVM_VERSION);
344 printf("Protocol version: %d.%d.%d\n",
345 CLVMD_MAJOR_VERSION, CLVMD_MINOR_VERSION,
346 CLVMD_PATCH_VERSION);
347 exit(1);
348 break;
349
350 }
351 }
352
353 /* Setting debug options on an existing clvmd */
354 if (debug_opt && !check_local_clvmd()) {
355
356 /* Sending to stderr makes no sense for a detached daemon */
357 if (debug == DEBUG_STDERR)
358 debug = DEBUG_SYSLOG;
359 return debug_clvmd(debug, clusterwide_opt)==1?0:1;
360 }
361
362 /* Fork into the background (unless requested not to) */
363 if (debug != DEBUG_STDERR) {
364 be_daemon(start_timeout);
365 }
366
367 DEBUGLOG("CLVMD started\n");
368
369 /* Open the Unix socket we listen for commands on.
370 We do this before opening the cluster socket so that
371 potential clients will block rather than error if we are running
372 but the cluster is not ready yet */
373 local_sock = open_local_sock();
374 if (local_sock < 0)
375 child_init_signal(DFAIL_LOCAL_SOCK);
376
377 /* Set up signal handlers, USR1 is for cluster change notifications (in cman)
378 USR2 causes child threads to exit.
379 HUP causes gulm version to re-read nodes list from CCS.
380 PIPE should be ignored */
381 signal(SIGUSR2, sigusr2_handler);
382 signal(SIGHUP, sighup_handler);
383 signal(SIGPIPE, SIG_IGN);
384
385 /* Block SIGUSR2/SIGINT/SIGTERM in process */
386 sigemptyset(&ss);
387 sigaddset(&ss, SIGUSR2);
388 sigaddset(&ss, SIGINT);
389 sigaddset(&ss, SIGTERM);
390 sigprocmask(SIG_BLOCK, &ss, NULL);
391
392 /* Initialise the LVM thread variables */
393 dm_list_init(&lvm_cmd_head);
394 pthread_mutex_init(&lvm_thread_mutex, NULL);
395 pthread_cond_init(&lvm_thread_cond, NULL);
396 pthread_mutex_init(&lvm_start_mutex, NULL);
397 init_lvhash();
398
399 /* Start the cluster interface */
400 if (cluster_iface == IF_AUTO)
401 cluster_iface = get_cluster_type();
402
403 #ifdef USE_CMAN
404 if ((cluster_iface == IF_AUTO || cluster_iface == IF_CMAN) && (clops = init_cman_cluster())) {
405 max_csid_len = CMAN_MAX_CSID_LEN;
406 max_cluster_message = CMAN_MAX_CLUSTER_MESSAGE;
407 max_cluster_member_name_len = CMAN_MAX_NODENAME_LEN;
408 syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to CMAN");
409 }
410 #endif
411 #ifdef USE_GULM
412 if (!clops)
413 if ((cluster_iface == IF_AUTO || cluster_iface == IF_GULM) && (clops = init_gulm_cluster())) {
414 max_csid_len = GULM_MAX_CSID_LEN;
415 max_cluster_message = GULM_MAX_CLUSTER_MESSAGE;
416 max_cluster_member_name_len = GULM_MAX_CLUSTER_MEMBER_NAME_LEN;
417 using_gulm = 1;
418 syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to GULM");
419 }
420 #endif
421 #ifdef USE_COROSYNC
422 if (!clops)
423 if (((cluster_iface == IF_AUTO || cluster_iface == IF_COROSYNC) && (clops = init_corosync_cluster()))) {
424 max_csid_len = COROSYNC_CSID_LEN;
425 max_cluster_message = COROSYNC_MAX_CLUSTER_MESSAGE;
426 max_cluster_member_name_len = COROSYNC_MAX_CLUSTER_MEMBER_NAME_LEN;
427 syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to Corosync");
428 }
429 #endif
430 #ifdef USE_OPENAIS
431 if (!clops)
432 if ((cluster_iface == IF_AUTO || cluster_iface == IF_OPENAIS) && (clops = init_openais_cluster())) {
433 max_csid_len = OPENAIS_CSID_LEN;
434 max_cluster_message = OPENAIS_MAX_CLUSTER_MESSAGE;
435 max_cluster_member_name_len = OPENAIS_MAX_CLUSTER_MEMBER_NAME_LEN;
436 syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to OpenAIS");
437 }
438 #endif
439
440 if (!clops) {
441 DEBUGLOG("Can't initialise cluster interface\n");
442 log_error("Can't initialise cluster interface\n");
443 child_init_signal(DFAIL_CLUSTER_IF);
444 }
445 DEBUGLOG("Cluster ready, doing some more initialisation\n");
446
447 /* Save our CSID */
448 uname(&nodeinfo);
449 clops->get_our_csid(our_csid);
450
451 /* Initialise the FD list head */
452 local_client_head.fd = clops->get_main_cluster_fd();
453 local_client_head.type = CLUSTER_MAIN_SOCK;
454 local_client_head.callback = clops->cluster_fd_callback;
455
456 /* Add the local socket to the list */
457 newfd = malloc(sizeof(struct local_client));
458 if (!newfd)
459 child_init_signal(DFAIL_MALLOC);
460
461 newfd->fd = local_sock;
462 newfd->removeme = 0;
463 newfd->type = LOCAL_RENDEZVOUS;
464 newfd->callback = local_rendezvous_callback;
465 newfd->next = local_client_head.next;
466 local_client_head.next = newfd;
467
468 /* This needs to be started after cluster initialisation
469 as it may need to take out locks */
470 DEBUGLOG("starting LVM thread\n");
471
472 /* Don't let anyone else to do work until we are started */
473 pthread_mutex_lock(&lvm_start_mutex);
474 pthread_create(&lvm_thread, NULL, (lvm_pthread_fn_t*)lvm_thread_fn,
475 (void *)(long)using_gulm);
476
477 /* Tell the rest of the cluster our version number */
478 /* CMAN can do this immediately, gulm needs to wait until
479 the core initialisation has finished and the node list
480 has been gathered */
481 if (clops->cluster_init_completed)
482 clops->cluster_init_completed();
483
484 DEBUGLOG("clvmd ready for work\n");
485 child_init_signal(SUCCESS);
486
487 /* Try to shutdown neatly */
488 signal(SIGTERM, sigterm_handler);
489 signal(SIGINT, sigterm_handler);
490
491 /* Do some work */
492 main_loop(local_sock, cmd_timeout);
493
494 destroy_lvm();
495
496 return 0;
497 }
498
499 /* Called when the GuLM cluster layer has completed initialisation.
500 We send the version message */
clvmd_cluster_init_completed()501 void clvmd_cluster_init_completed()
502 {
503 send_version_message();
504 }
505
506 /* Data on a connected socket */
local_sock_callback(struct local_client * thisfd,char * buf,int len,const char * csid,struct local_client ** new_client)507 static int local_sock_callback(struct local_client *thisfd, char *buf, int len,
508 const char *csid,
509 struct local_client **new_client)
510 {
511 *new_client = NULL;
512 return read_from_local_sock(thisfd);
513 }
514
515 /* Data on a connected socket */
local_rendezvous_callback(struct local_client * thisfd,char * buf,int len,const char * csid,struct local_client ** new_client)516 static int local_rendezvous_callback(struct local_client *thisfd, char *buf,
517 int len, const char *csid,
518 struct local_client **new_client)
519 {
520 /* Someone connected to our local socket, accept it. */
521
522 struct sockaddr_un socka;
523 struct local_client *newfd;
524 socklen_t sl = sizeof(socka);
525 int client_fd = accept(thisfd->fd, (struct sockaddr *) &socka, &sl);
526
527 if (client_fd == -1 && errno == EINTR)
528 return 1;
529
530 if (client_fd >= 0) {
531 newfd = malloc(sizeof(struct local_client));
532 if (!newfd) {
533 close(client_fd);
534 return 1;
535 }
536 newfd->fd = client_fd;
537 newfd->type = LOCAL_SOCK;
538 newfd->xid = 0;
539 newfd->removeme = 0;
540 newfd->callback = local_sock_callback;
541 newfd->bits.localsock.replies = NULL;
542 newfd->bits.localsock.expected_replies = 0;
543 newfd->bits.localsock.cmd = NULL;
544 newfd->bits.localsock.in_progress = FALSE;
545 newfd->bits.localsock.sent_out = FALSE;
546 newfd->bits.localsock.threadid = 0;
547 newfd->bits.localsock.finished = 0;
548 newfd->bits.localsock.pipe_client = NULL;
549 newfd->bits.localsock.private = NULL;
550 newfd->bits.localsock.all_success = 1;
551 DEBUGLOG("Got new connection on fd %d\n", newfd->fd);
552 *new_client = newfd;
553 }
554 return 1;
555 }
556
local_pipe_callback(struct local_client * thisfd,char * buf,int maxlen,const char * csid,struct local_client ** new_client)557 static int local_pipe_callback(struct local_client *thisfd, char *buf,
558 int maxlen, const char *csid,
559 struct local_client **new_client)
560 {
561 int len;
562 char buffer[PIPE_BUF];
563 struct local_client *sock_client = thisfd->bits.pipe.client;
564 int status = -1; /* in error by default */
565
566 len = read(thisfd->fd, buffer, sizeof(int));
567 if (len == -1 && errno == EINTR)
568 return 1;
569
570 if (len == sizeof(int)) {
571 memcpy(&status, buffer, sizeof(int));
572 }
573
574 DEBUGLOG("read on PIPE %d: %d bytes: status: %d\n",
575 thisfd->fd, len, status);
576
577 /* EOF on pipe or an error, close it */
578 if (len <= 0) {
579 int jstat;
580 void *ret = &status;
581 close(thisfd->fd);
582
583 /* Clear out the cross-link */
584 if (thisfd->bits.pipe.client != NULL)
585 thisfd->bits.pipe.client->bits.localsock.pipe_client =
586 NULL;
587
588 /* Reap child thread */
589 if (thisfd->bits.pipe.threadid) {
590 jstat = pthread_join(thisfd->bits.pipe.threadid, &ret);
591 thisfd->bits.pipe.threadid = 0;
592 if (thisfd->bits.pipe.client != NULL)
593 thisfd->bits.pipe.client->bits.localsock.
594 threadid = 0;
595 }
596 return -1;
597 } else {
598 DEBUGLOG("background routine status was %d, sock_client=%p\n",
599 status, sock_client);
600 /* But has the client gone away ?? */
601 if (sock_client == NULL) {
602 DEBUGLOG
603 ("Got PIPE response for dead client, ignoring it\n");
604 } else {
605 /* If error then just return that code */
606 if (status)
607 send_local_reply(sock_client, status,
608 sock_client->fd);
609 else {
610 if (sock_client->bits.localsock.state ==
611 POST_COMMAND) {
612 send_local_reply(sock_client, 0,
613 sock_client->fd);
614 } else // PRE_COMMAND finished.
615 {
616 if (
617 (status =
618 distribute_command(sock_client)) !=
619 0) send_local_reply(sock_client,
620 EFBIG,
621 sock_client->
622 fd);
623 }
624 }
625 }
626 }
627 return len;
628 }
629
630 /* If a noed is up, look for it in the reply array, if it's not there then
631 add one with "ETIMEDOUT".
632 NOTE: This won't race with real replies because they happen in the same thread.
633 */
timedout_callback(struct local_client * client,const char * csid,int node_up)634 static void timedout_callback(struct local_client *client, const char *csid,
635 int node_up)
636 {
637 if (node_up) {
638 struct node_reply *reply;
639 char nodename[max_cluster_member_name_len];
640
641 clops->name_from_csid(csid, nodename);
642 DEBUGLOG("Checking for a reply from %s\n", nodename);
643 pthread_mutex_lock(&client->bits.localsock.reply_mutex);
644
645 reply = client->bits.localsock.replies;
646 while (reply && strcmp(reply->node, nodename) != 0) {
647 reply = reply->next;
648 }
649
650 pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
651
652 if (!reply) {
653 DEBUGLOG("Node %s timed-out\n", nodename);
654 add_reply_to_list(client, ETIMEDOUT, csid,
655 "Command timed out", 18);
656 }
657 }
658 }
659
660 /* Called when the request has timed out on at least one node. We fill in
661 the remaining node entries with ETIMEDOUT and return.
662
663 By the time we get here the node that caused
664 the timeout could have gone down, in which case we will never get the expected
665 number of replies that triggers the post command so we need to do it here
666 */
request_timed_out(struct local_client * client)667 static void request_timed_out(struct local_client *client)
668 {
669 DEBUGLOG("Request timed-out. padding\n");
670 clops->cluster_do_node_callback(client, timedout_callback);
671
672 if (client->bits.localsock.num_replies !=
673 client->bits.localsock.expected_replies) {
674 /* Post-process the command */
675 if (client->bits.localsock.threadid) {
676 pthread_mutex_lock(&client->bits.localsock.mutex);
677 client->bits.localsock.state = POST_COMMAND;
678 pthread_cond_signal(&client->bits.localsock.cond);
679 pthread_mutex_unlock(&client->bits.localsock.mutex);
680 }
681 }
682 }
683
684 /* This is where the real work happens */
main_loop(int local_sock,int cmd_timeout)685 static void main_loop(int local_sock, int cmd_timeout)
686 {
687 DEBUGLOG("Using timeout of %d seconds\n", cmd_timeout);
688
689 sigset_t ss;
690 sigemptyset(&ss);
691 sigaddset(&ss, SIGINT);
692 sigaddset(&ss, SIGTERM);
693 pthread_sigmask(SIG_UNBLOCK, &ss, NULL);
694 /* Main loop */
695 while (!quit) {
696 fd_set in;
697 int select_status;
698 struct local_client *thisfd;
699 struct timeval tv = { cmd_timeout, 0 };
700 int quorate = clops->is_quorate();
701
702 /* Wait on the cluster FD and all local sockets/pipes */
703 local_client_head.fd = clops->get_main_cluster_fd();
704 FD_ZERO(&in);
705 for (thisfd = &local_client_head; thisfd != NULL;
706 thisfd = thisfd->next) {
707
708 if (thisfd->removeme)
709 continue;
710
711 /* if the cluster is not quorate then don't listen for new requests */
712 if ((thisfd->type != LOCAL_RENDEZVOUS &&
713 thisfd->type != LOCAL_SOCK) || quorate)
714 FD_SET(thisfd->fd, &in);
715 }
716
717 select_status = select(FD_SETSIZE, &in, NULL, NULL, &tv);
718
719 if (reread_config) {
720 int saved_errno = errno;
721
722 reread_config = 0;
723 if (clops->reread_config)
724 clops->reread_config();
725 errno = saved_errno;
726 }
727
728 if (select_status > 0) {
729 struct local_client *lastfd = NULL;
730 char csid[MAX_CSID_LEN];
731 char buf[max_cluster_message];
732
733 for (thisfd = &local_client_head; thisfd != NULL;
734 thisfd = thisfd->next) {
735
736 if (thisfd->removeme) {
737 struct local_client *free_fd;
738 lastfd->next = thisfd->next;
739 free_fd = thisfd;
740 thisfd = lastfd;
741
742 DEBUGLOG("removeme set for fd %d\n", free_fd->fd);
743
744 /* Queue cleanup, this also frees the client struct */
745 add_to_lvmqueue(free_fd, NULL, 0, NULL);
746 break;
747 }
748
749 if (FD_ISSET(thisfd->fd, &in)) {
750 struct local_client *newfd = NULL;
751 int ret;
752
753 /* Do callback */
754 ret =
755 thisfd->callback(thisfd, buf,
756 sizeof(buf), csid,
757 &newfd);
758 /* Ignore EAGAIN */
759 if (ret < 0 && (errno == EAGAIN ||
760 errno == EINTR)) continue;
761
762 /* Got error or EOF: Remove it from the list safely */
763 if (ret <= 0) {
764 struct local_client *free_fd;
765 int type = thisfd->type;
766
767 /* If the cluster socket shuts down, so do we */
768 if (type == CLUSTER_MAIN_SOCK ||
769 type == CLUSTER_INTERNAL)
770 goto closedown;
771
772 DEBUGLOG("ret == %d, errno = %d. removing client\n",
773 ret, errno);
774 lastfd->next = thisfd->next;
775 free_fd = thisfd;
776 thisfd = lastfd;
777 close(free_fd->fd);
778
779 /* Queue cleanup, this also frees the client struct */
780 add_to_lvmqueue(free_fd, NULL, 0, NULL);
781 break;
782 }
783
784 /* New client...simply add it to the list */
785 if (newfd) {
786 newfd->next = thisfd->next;
787 thisfd->next = newfd;
788 break;
789 }
790 }
791 lastfd = thisfd;
792 }
793 }
794
795 /* Select timed out. Check for clients that have been waiting too long for a response */
796 if (select_status == 0) {
797 time_t the_time = time(NULL);
798
799 for (thisfd = &local_client_head; thisfd != NULL;
800 thisfd = thisfd->next) {
801 if (thisfd->type == LOCAL_SOCK
802 && thisfd->bits.localsock.sent_out
803 && thisfd->bits.localsock.sent_time +
804 cmd_timeout < the_time
805 && thisfd->bits.localsock.
806 expected_replies !=
807 thisfd->bits.localsock.num_replies) {
808 /* Send timed out message + replies we already have */
809 DEBUGLOG
810 ("Request timed-out (send: %ld, now: %ld)\n",
811 thisfd->bits.localsock.sent_time,
812 the_time);
813
814 thisfd->bits.localsock.all_success = 0;
815
816 request_timed_out(thisfd);
817 }
818 }
819 }
820 if (select_status < 0) {
821 if (errno == EINTR)
822 continue;
823
824 #ifdef DEBUG
825 perror("select error");
826 exit(-1);
827 #endif
828 }
829 }
830
831 closedown:
832 clops->cluster_closedown();
833 close(local_sock);
834 }
835
wait_for_child(int c_pipe,int timeout)836 static __attribute__ ((noreturn)) void wait_for_child(int c_pipe, int timeout)
837 {
838 int child_status;
839 int sstat;
840 fd_set fds;
841 struct timeval tv = {timeout, 0};
842
843 FD_ZERO(&fds);
844 FD_SET(c_pipe, &fds);
845
846 sstat = select(c_pipe+1, &fds, NULL, NULL, timeout? &tv: NULL);
847 if (sstat == 0) {
848 fprintf(stderr, "clvmd startup timed out\n");
849 exit(DFAIL_TIMEOUT);
850 }
851 if (sstat == 1) {
852 if (read(c_pipe, &child_status, sizeof(child_status)) !=
853 sizeof(child_status)) {
854
855 fprintf(stderr, "clvmd failed in initialisation\n");
856 exit(DFAIL_INIT);
857 }
858 else {
859 switch (child_status) {
860 case SUCCESS:
861 break;
862 case DFAIL_INIT:
863 fprintf(stderr, "clvmd failed in initialisation\n");
864 break;
865 case DFAIL_LOCAL_SOCK:
866 fprintf(stderr, "clvmd could not create local socket\n");
867 fprintf(stderr, "Another clvmd is probably already running\n");
868 break;
869 case DFAIL_CLUSTER_IF:
870 fprintf(stderr, "clvmd could not connect to cluster manager\n");
871 fprintf(stderr, "Consult syslog for more information\n");
872 break;
873 case DFAIL_MALLOC:
874 fprintf(stderr, "clvmd failed, not enough memory\n");
875 break;
876 default:
877 fprintf(stderr, "clvmd failed, error was %d\n", child_status);
878 break;
879 }
880 exit(child_status);
881 }
882 }
883 fprintf(stderr, "clvmd startup, select failed: %s\n", strerror(errno));
884 exit(DFAIL_INIT);
885 }
886
887 /*
888 * Fork into the background and detach from our parent process.
889 * In the interests of user-friendliness we wait for the daemon
890 * to complete initialisation before returning its status
891 * the the user.
892 */
be_daemon(int timeout)893 static void be_daemon(int timeout)
894 {
895 pid_t pid;
896 int devnull = open("/dev/null", O_RDWR);
897 if (devnull == -1) {
898 perror("Can't open /dev/null");
899 exit(3);
900 }
901
902 pipe(child_pipe);
903
904 switch (pid = fork()) {
905 case -1:
906 perror("clvmd: can't fork");
907 exit(2);
908
909 case 0: /* Child */
910 close(child_pipe[0]);
911 break;
912
913 default: /* Parent */
914 close(child_pipe[1]);
915 wait_for_child(child_pipe[0], timeout);
916 }
917
918 /* Detach ourself from the calling environment */
919 if (close(0) || close(1) || close(2)) {
920 perror("Error closing terminal FDs");
921 exit(4);
922 }
923 setsid();
924
925 if (dup2(devnull, 0) < 0 || dup2(devnull, 1) < 0
926 || dup2(devnull, 2) < 0) {
927 int e = errno;
928 perror("Error setting terminal FDs to /dev/null");
929 log_error("Error setting terminal FDs to /dev/null: %s", strerror(e));
930 exit(5);
931 }
932 if (chdir("/")) {
933 log_error("Error setting current directory to /: %s", strerror(e));
934 exit(6);
935 }
936
937 }
938
939 /* Called when we have a read from the local socket.
940 was in the main loop but it's grown up and is a big girl now */
read_from_local_sock(struct local_client * thisfd)941 static int read_from_local_sock(struct local_client *thisfd)
942 {
943 int len;
944 int argslen;
945 int missing_len;
946 char buffer[PIPE_BUF];
947
948 len = read(thisfd->fd, buffer, sizeof(buffer));
949 if (len == -1 && errno == EINTR)
950 return 1;
951
952 DEBUGLOG("Read on local socket %d, len = %d\n", thisfd->fd, len);
953
954 /* EOF or error on socket */
955 if (len <= 0) {
956 int *status;
957 int jstat;
958
959 DEBUGLOG("EOF on local socket: inprogress=%d\n",
960 thisfd->bits.localsock.in_progress);
961
962 thisfd->bits.localsock.finished = 1;
963
964 /* If the client went away in mid command then tidy up */
965 if (thisfd->bits.localsock.in_progress) {
966 pthread_kill(thisfd->bits.localsock.threadid, SIGUSR2);
967 pthread_mutex_lock(&thisfd->bits.localsock.mutex);
968 thisfd->bits.localsock.state = POST_COMMAND;
969 pthread_cond_signal(&thisfd->bits.localsock.cond);
970 pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
971
972 /* Free any unsent buffers */
973 free_reply(thisfd);
974 }
975
976 /* Kill the subthread & free resources */
977 if (thisfd->bits.localsock.threadid) {
978 DEBUGLOG("Waiting for child thread\n");
979 pthread_mutex_lock(&thisfd->bits.localsock.mutex);
980 thisfd->bits.localsock.state = PRE_COMMAND;
981 pthread_cond_signal(&thisfd->bits.localsock.cond);
982 pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
983
984 jstat =
985 pthread_join(thisfd->bits.localsock.threadid,
986 (void **) &status);
987 DEBUGLOG("Joined child thread\n");
988
989 thisfd->bits.localsock.threadid = 0;
990 pthread_cond_destroy(&thisfd->bits.localsock.cond);
991 pthread_mutex_destroy(&thisfd->bits.localsock.mutex);
992
993 /* Remove the pipe client */
994 if (thisfd->bits.localsock.pipe_client != NULL) {
995 struct local_client *newfd;
996 struct local_client *lastfd = NULL;
997 struct local_client *free_fd = NULL;
998
999 close(thisfd->bits.localsock.pipe_client->fd); /* Close pipe */
1000 close(thisfd->bits.localsock.pipe);
1001
1002 /* Remove pipe client */
1003 for (newfd = &local_client_head; newfd != NULL;
1004 newfd = newfd->next) {
1005 if (thisfd->bits.localsock.
1006 pipe_client == newfd) {
1007 thisfd->bits.localsock.
1008 pipe_client = NULL;
1009
1010 lastfd->next = newfd->next;
1011 free_fd = newfd;
1012 newfd->next = lastfd;
1013 free(free_fd);
1014 break;
1015 }
1016 lastfd = newfd;
1017 }
1018 }
1019 }
1020
1021 /* Free the command buffer */
1022 free(thisfd->bits.localsock.cmd);
1023
1024 /* Clear out the cross-link */
1025 if (thisfd->bits.localsock.pipe_client != NULL)
1026 thisfd->bits.localsock.pipe_client->bits.pipe.client =
1027 NULL;
1028
1029 close(thisfd->fd);
1030 return 0;
1031 } else {
1032 int comms_pipe[2];
1033 struct local_client *newfd;
1034 char csid[MAX_CSID_LEN];
1035 struct clvm_header *inheader;
1036 int status;
1037
1038 inheader = (struct clvm_header *) buffer;
1039
1040 /* Fill in the client ID */
1041 inheader->clientid = htonl(thisfd->fd);
1042
1043 /* If we are already busy then return an error */
1044 if (thisfd->bits.localsock.in_progress) {
1045 struct clvm_header reply;
1046 reply.cmd = CLVMD_CMD_REPLY;
1047 reply.status = EBUSY;
1048 reply.arglen = 0;
1049 reply.flags = 0;
1050 send_message(&reply, sizeof(reply), our_csid,
1051 thisfd->fd,
1052 "Error sending EBUSY reply to local user");
1053 return len;
1054 }
1055
1056 /* Free any old buffer space */
1057 free(thisfd->bits.localsock.cmd);
1058
1059 /* See if we have the whole message */
1060 argslen =
1061 len - strlen(inheader->node) - sizeof(struct clvm_header);
1062 missing_len = inheader->arglen - argslen;
1063
1064 if (missing_len < 0)
1065 missing_len = 0;
1066
1067 /* Save the message */
1068 thisfd->bits.localsock.cmd = malloc(len + missing_len);
1069
1070 if (!thisfd->bits.localsock.cmd) {
1071 struct clvm_header reply;
1072 reply.cmd = CLVMD_CMD_REPLY;
1073 reply.status = ENOMEM;
1074 reply.arglen = 0;
1075 reply.flags = 0;
1076 send_message(&reply, sizeof(reply), our_csid,
1077 thisfd->fd,
1078 "Error sending ENOMEM reply to local user");
1079 return 0;
1080 }
1081 memcpy(thisfd->bits.localsock.cmd, buffer, len);
1082 thisfd->bits.localsock.cmd_len = len + missing_len;
1083 inheader = (struct clvm_header *) thisfd->bits.localsock.cmd;
1084
1085 /* If we don't have the full message then read the rest now */
1086 if (missing_len) {
1087 char *argptr =
1088 inheader->node + strlen(inheader->node) + 1;
1089
1090 while (missing_len > 0 && len >= 0) {
1091 DEBUGLOG
1092 ("got %d bytes, need another %d (total %d)\n",
1093 argslen, missing_len, inheader->arglen);
1094 len = read(thisfd->fd, argptr + argslen,
1095 missing_len);
1096 if (len >= 0) {
1097 missing_len -= len;
1098 argslen += len;
1099 }
1100 }
1101 }
1102
1103 /* Initialise and lock the mutex so the subthread will wait after
1104 finishing the PRE routine */
1105 if (!thisfd->bits.localsock.threadid) {
1106 pthread_mutex_init(&thisfd->bits.localsock.mutex, NULL);
1107 pthread_cond_init(&thisfd->bits.localsock.cond, NULL);
1108 pthread_mutex_init(&thisfd->bits.localsock.reply_mutex, NULL);
1109 }
1110
1111 /* Only run the command if all the cluster nodes are running CLVMD */
1112 if (((inheader->flags & CLVMD_FLAG_LOCAL) == 0) &&
1113 (check_all_clvmds_running(thisfd) == -1)) {
1114 thisfd->bits.localsock.expected_replies = 0;
1115 thisfd->bits.localsock.num_replies = 0;
1116 send_local_reply(thisfd, EHOSTDOWN, thisfd->fd);
1117 return len;
1118 }
1119
1120 /* Check the node name for validity */
1121 if (inheader->node[0] && clops->csid_from_name(csid, inheader->node)) {
1122 /* Error, node is not in the cluster */
1123 struct clvm_header reply;
1124 DEBUGLOG("Unknown node: '%s'\n", inheader->node);
1125
1126 reply.cmd = CLVMD_CMD_REPLY;
1127 reply.status = ENOENT;
1128 reply.flags = 0;
1129 reply.arglen = 0;
1130 send_message(&reply, sizeof(reply), our_csid,
1131 thisfd->fd,
1132 "Error sending ENOENT reply to local user");
1133 thisfd->bits.localsock.expected_replies = 0;
1134 thisfd->bits.localsock.num_replies = 0;
1135 thisfd->bits.localsock.in_progress = FALSE;
1136 thisfd->bits.localsock.sent_out = FALSE;
1137 return len;
1138 }
1139
1140 /* If we already have a subthread then just signal it to start */
1141 if (thisfd->bits.localsock.threadid) {
1142 pthread_mutex_lock(&thisfd->bits.localsock.mutex);
1143 thisfd->bits.localsock.state = PRE_COMMAND;
1144 pthread_cond_signal(&thisfd->bits.localsock.cond);
1145 pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
1146 return len;
1147 }
1148
1149 /* Create a pipe and add the reading end to our FD list */
1150 pipe(comms_pipe);
1151 newfd = malloc(sizeof(struct local_client));
1152 if (!newfd) {
1153 struct clvm_header reply;
1154 close(comms_pipe[0]);
1155 close(comms_pipe[1]);
1156
1157 reply.cmd = CLVMD_CMD_REPLY;
1158 reply.status = ENOMEM;
1159 reply.arglen = 0;
1160 reply.flags = 0;
1161 send_message(&reply, sizeof(reply), our_csid,
1162 thisfd->fd,
1163 "Error sending ENOMEM reply to local user");
1164 return len;
1165 }
1166 DEBUGLOG("creating pipe, [%d, %d]\n", comms_pipe[0],
1167 comms_pipe[1]);
1168 newfd->fd = comms_pipe[0];
1169 newfd->removeme = 0;
1170 newfd->type = THREAD_PIPE;
1171 newfd->callback = local_pipe_callback;
1172 newfd->next = thisfd->next;
1173 newfd->bits.pipe.client = thisfd;
1174 newfd->bits.pipe.threadid = 0;
1175 thisfd->next = newfd;
1176
1177 /* Store a cross link to the pipe */
1178 thisfd->bits.localsock.pipe_client = newfd;
1179
1180 thisfd->bits.localsock.pipe = comms_pipe[1];
1181
1182 /* Make sure the thread has a copy of it's own ID */
1183 newfd->bits.pipe.threadid = thisfd->bits.localsock.threadid;
1184
1185 /* Run the pre routine */
1186 thisfd->bits.localsock.in_progress = TRUE;
1187 thisfd->bits.localsock.state = PRE_COMMAND;
1188 DEBUGLOG("Creating pre&post thread\n");
1189 status = pthread_create(&thisfd->bits.localsock.threadid, NULL,
1190 pre_and_post_thread, thisfd);
1191 DEBUGLOG("Created pre&post thread, state = %d\n", status);
1192 }
1193 return len;
1194 }
1195
1196 /* Add a file descriptor from the cluster or comms interface to
1197 our list of FDs for select
1198 */
add_client(struct local_client * new_client)1199 int add_client(struct local_client *new_client)
1200 {
1201 new_client->next = local_client_head.next;
1202 local_client_head.next = new_client;
1203
1204 return 0;
1205 }
1206
1207 /* Called when the pre-command has completed successfully - we
1208 now execute the real command on all the requested nodes */
distribute_command(struct local_client * thisfd)1209 static int distribute_command(struct local_client *thisfd)
1210 {
1211 struct clvm_header *inheader =
1212 (struct clvm_header *) thisfd->bits.localsock.cmd;
1213 int len = thisfd->bits.localsock.cmd_len;
1214
1215 thisfd->xid = global_xid++;
1216 DEBUGLOG("distribute command: XID = %d\n", thisfd->xid);
1217
1218 /* Forward it to other nodes in the cluster if needed */
1219 if (!(inheader->flags & CLVMD_FLAG_LOCAL)) {
1220 /* if node is empty then do it on the whole cluster */
1221 if (inheader->node[0] == '\0') {
1222 thisfd->bits.localsock.expected_replies =
1223 clops->get_num_nodes();
1224 thisfd->bits.localsock.num_replies = 0;
1225 thisfd->bits.localsock.sent_time = time(NULL);
1226 thisfd->bits.localsock.in_progress = TRUE;
1227 thisfd->bits.localsock.sent_out = TRUE;
1228
1229 /* Do it here first */
1230 add_to_lvmqueue(thisfd, inheader, len, NULL);
1231
1232 DEBUGLOG("Sending message to all cluster nodes\n");
1233 inheader->xid = thisfd->xid;
1234 send_message(inheader, len, NULL, -1,
1235 "Error forwarding message to cluster");
1236 } else {
1237 /* Do it on a single node */
1238 char csid[MAX_CSID_LEN];
1239
1240 if (clops->csid_from_name(csid, inheader->node)) {
1241 /* This has already been checked so should not happen */
1242 return 0;
1243 } else {
1244 /* OK, found a node... */
1245 thisfd->bits.localsock.expected_replies = 1;
1246 thisfd->bits.localsock.num_replies = 0;
1247 thisfd->bits.localsock.in_progress = TRUE;
1248
1249 /* Are we the requested node ?? */
1250 if (memcmp(csid, our_csid, max_csid_len) == 0) {
1251 DEBUGLOG("Doing command on local node only\n");
1252 add_to_lvmqueue(thisfd, inheader, len, NULL);
1253 } else {
1254 DEBUGLOG("Sending message to single node: %s\n",
1255 inheader->node);
1256 inheader->xid = thisfd->xid;
1257 send_message(inheader, len,
1258 csid, -1,
1259 "Error forwarding message to cluster node");
1260 }
1261 }
1262 }
1263 } else {
1264 /* Local explicitly requested, ignore nodes */
1265 thisfd->bits.localsock.in_progress = TRUE;
1266 thisfd->bits.localsock.expected_replies = 1;
1267 thisfd->bits.localsock.num_replies = 0;
1268 add_to_lvmqueue(thisfd, inheader, len, NULL);
1269 }
1270 return 0;
1271 }
1272
1273 /* Process a command from a remote node and return the result */
process_remote_command(struct clvm_header * msg,int msglen,int fd,const char * csid)1274 static void process_remote_command(struct clvm_header *msg, int msglen, int fd,
1275 const char *csid)
1276 {
1277 char *replyargs;
1278 char nodename[max_cluster_member_name_len];
1279 int replylen = 0;
1280 int buflen = max_cluster_message - sizeof(struct clvm_header) - 1;
1281 int status;
1282 int msg_malloced = 0;
1283
1284 /* Get the node name as we /may/ need it later */
1285 clops->name_from_csid(csid, nodename);
1286
1287 DEBUGLOG("process_remote_command %s for clientid 0x%x XID %d on node %s\n",
1288 decode_cmd(msg->cmd), msg->clientid, msg->xid, nodename);
1289
1290 /* Check for GOAWAY and sulk */
1291 if (msg->cmd == CLVMD_CMD_GOAWAY) {
1292
1293 DEBUGLOG("Told to go away by %s\n", nodename);
1294 log_error("Told to go away by %s\n", nodename);
1295 exit(99);
1296 }
1297
1298 /* Version check is internal - don't bother exposing it in
1299 clvmd-command.c */
1300 if (msg->cmd == CLVMD_CMD_VERSION) {
1301 int version_nums[3];
1302 char node[256];
1303
1304 memcpy(version_nums, msg->args, sizeof(version_nums));
1305
1306 clops->name_from_csid(csid, node);
1307 DEBUGLOG("Remote node %s is version %d.%d.%d\n",
1308 node,
1309 ntohl(version_nums[0]),
1310 ntohl(version_nums[1]), ntohl(version_nums[2]));
1311
1312 if (ntohl(version_nums[0]) != CLVMD_MAJOR_VERSION) {
1313 struct clvm_header byebyemsg;
1314 DEBUGLOG
1315 ("Telling node %s to go away because of incompatible version number\n",
1316 node);
1317 log_notice
1318 ("Telling node %s to go away because of incompatible version number %d.%d.%d\n",
1319 node, ntohl(version_nums[0]),
1320 ntohl(version_nums[1]), ntohl(version_nums[2]));
1321
1322 byebyemsg.cmd = CLVMD_CMD_GOAWAY;
1323 byebyemsg.status = 0;
1324 byebyemsg.flags = 0;
1325 byebyemsg.arglen = 0;
1326 byebyemsg.clientid = 0;
1327 clops->cluster_send_message(&byebyemsg, sizeof(byebyemsg),
1328 our_csid,
1329 "Error Sending GOAWAY message");
1330 } else {
1331 clops->add_up_node(csid);
1332 }
1333 return;
1334 }
1335
1336 /* Allocate a default reply buffer */
1337 replyargs = malloc(max_cluster_message - sizeof(struct clvm_header));
1338
1339 if (replyargs != NULL) {
1340 /* Run the command */
1341 status =
1342 do_command(NULL, msg, msglen, &replyargs, buflen,
1343 &replylen);
1344 } else {
1345 status = ENOMEM;
1346 }
1347
1348 /* If it wasn't a reply, then reply */
1349 if (msg->cmd != CLVMD_CMD_REPLY) {
1350 char *aggreply;
1351
1352 aggreply =
1353 realloc(replyargs, replylen + sizeof(struct clvm_header));
1354 if (aggreply) {
1355 struct clvm_header *agghead =
1356 (struct clvm_header *) aggreply;
1357
1358 replyargs = aggreply;
1359 /* Move it up so there's room for a header in front of the data */
1360 memmove(aggreply + offsetof(struct clvm_header, args),
1361 replyargs, replylen);
1362
1363 agghead->xid = msg->xid;
1364 agghead->cmd = CLVMD_CMD_REPLY;
1365 agghead->status = status;
1366 agghead->flags = 0;
1367 agghead->clientid = msg->clientid;
1368 agghead->arglen = replylen;
1369 agghead->node[0] = '\0';
1370 send_message(aggreply,
1371 sizeof(struct clvm_header) +
1372 replylen, csid, fd,
1373 "Error sending command reply");
1374 } else {
1375 struct clvm_header head;
1376
1377 DEBUGLOG("Error attempting to realloc return buffer\n");
1378 /* Return a failure response */
1379 head.cmd = CLVMD_CMD_REPLY;
1380 head.status = ENOMEM;
1381 head.flags = 0;
1382 head.clientid = msg->clientid;
1383 head.arglen = 0;
1384 head.node[0] = '\0';
1385 send_message(&head, sizeof(struct clvm_header), csid,
1386 fd, "Error sending ENOMEM command reply");
1387 return;
1388 }
1389 }
1390
1391 /* Free buffer if it was malloced */
1392 if (msg_malloced) {
1393 free(msg);
1394 }
1395 free(replyargs);
1396 }
1397
1398 /* Add a reply to a command to the list of replies for this client.
1399 If we have got a full set then send them to the waiting client down the local
1400 socket */
add_reply_to_list(struct local_client * client,int status,const char * csid,const char * buf,int len)1401 static void add_reply_to_list(struct local_client *client, int status,
1402 const char *csid, const char *buf, int len)
1403 {
1404 struct node_reply *reply;
1405
1406 pthread_mutex_lock(&client->bits.localsock.reply_mutex);
1407
1408 /* Add it to the list of replies */
1409 reply = malloc(sizeof(struct node_reply));
1410 if (reply) {
1411 reply->status = status;
1412 clops->name_from_csid(csid, reply->node);
1413 DEBUGLOG("Reply from node %s: %d bytes\n", reply->node, len);
1414
1415 if (len > 0) {
1416 reply->replymsg = malloc(len);
1417 if (!reply->replymsg) {
1418 reply->status = ENOMEM;
1419 } else {
1420 memcpy(reply->replymsg, buf, len);
1421 }
1422 } else {
1423 reply->replymsg = NULL;
1424 }
1425 /* Hook it onto the reply chain */
1426 reply->next = client->bits.localsock.replies;
1427 client->bits.localsock.replies = reply;
1428 } else {
1429 /* It's all gone horribly wrong... */
1430 pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
1431 send_local_reply(client, ENOMEM, client->fd);
1432 return;
1433 }
1434 DEBUGLOG("Got %d replies, expecting: %d\n",
1435 client->bits.localsock.num_replies + 1,
1436 client->bits.localsock.expected_replies);
1437
1438 /* If we have the whole lot then do the post-process */
1439 if (++client->bits.localsock.num_replies ==
1440 client->bits.localsock.expected_replies) {
1441 /* Post-process the command */
1442 if (client->bits.localsock.threadid) {
1443 pthread_mutex_lock(&client->bits.localsock.mutex);
1444 client->bits.localsock.state = POST_COMMAND;
1445 pthread_cond_signal(&client->bits.localsock.cond);
1446 pthread_mutex_unlock(&client->bits.localsock.mutex);
1447 }
1448 }
1449 pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
1450 }
1451
1452 /* This is the thread that runs the PRE and post commands for a particular connection */
pre_and_post_thread(void * arg)1453 static __attribute__ ((noreturn)) void *pre_and_post_thread(void *arg)
1454 {
1455 struct local_client *client = (struct local_client *) arg;
1456 int status;
1457 int write_status;
1458 sigset_t ss;
1459 int pipe_fd = client->bits.localsock.pipe;
1460
1461 DEBUGLOG("in sub thread: client = %p\n", client);
1462
1463 /* Don't start until the LVM thread is ready */
1464 pthread_mutex_lock(&lvm_start_mutex);
1465 pthread_mutex_unlock(&lvm_start_mutex);
1466 DEBUGLOG("Sub thread ready for work.\n");
1467
1468 /* Ignore SIGUSR1 (handled by master process) but enable
1469 SIGUSR2 (kills subthreads) */
1470 sigemptyset(&ss);
1471 sigaddset(&ss, SIGUSR1);
1472 pthread_sigmask(SIG_BLOCK, &ss, NULL);
1473
1474 sigdelset(&ss, SIGUSR1);
1475 sigaddset(&ss, SIGUSR2);
1476 pthread_sigmask(SIG_UNBLOCK, &ss, NULL);
1477
1478 /* Loop around doing PRE and POST functions until the client goes away */
1479 while (!client->bits.localsock.finished) {
1480 /* Execute the code */
1481 status = do_pre_command(client);
1482
1483 if (status)
1484 client->bits.localsock.all_success = 0;
1485
1486 DEBUGLOG("Writing status %d down pipe %d\n", status, pipe_fd);
1487
1488 /* Tell the parent process we have finished this bit */
1489 do {
1490 write_status = write(pipe_fd, &status, sizeof(int));
1491 if (write_status == sizeof(int))
1492 break;
1493 if (write_status < 0 &&
1494 (errno == EINTR || errno == EAGAIN))
1495 continue;
1496 log_error("Error sending to pipe: %s\n", strerror(errno));
1497 break;
1498 } while(1);
1499
1500 if (status) {
1501 client->bits.localsock.state = POST_COMMAND;
1502 goto next_pre;
1503 }
1504
1505 /* We may need to wait for the condition variable before running the post command */
1506 pthread_mutex_lock(&client->bits.localsock.mutex);
1507 DEBUGLOG("Waiting to do post command - state = %d\n",
1508 client->bits.localsock.state);
1509
1510 if (client->bits.localsock.state != POST_COMMAND) {
1511 pthread_cond_wait(&client->bits.localsock.cond,
1512 &client->bits.localsock.mutex);
1513 }
1514 pthread_mutex_unlock(&client->bits.localsock.mutex);
1515
1516 DEBUGLOG("Got post command condition...\n");
1517
1518 /* POST function must always run, even if the client aborts */
1519 status = 0;
1520 do_post_command(client);
1521
1522 do {
1523 write_status = write(pipe_fd, &status, sizeof(int));
1524 if (write_status == sizeof(int))
1525 break;
1526 if (write_status < 0 &&
1527 (errno == EINTR || errno == EAGAIN))
1528 continue;
1529 log_error("Error sending to pipe: %s\n", strerror(errno));
1530 break;
1531 } while(1);
1532 next_pre:
1533 DEBUGLOG("Waiting for next pre command\n");
1534
1535 pthread_mutex_lock(&client->bits.localsock.mutex);
1536 if (client->bits.localsock.state != PRE_COMMAND &&
1537 !client->bits.localsock.finished) {
1538 pthread_cond_wait(&client->bits.localsock.cond,
1539 &client->bits.localsock.mutex);
1540 }
1541 pthread_mutex_unlock(&client->bits.localsock.mutex);
1542
1543 DEBUGLOG("Got pre command condition...\n");
1544 }
1545 DEBUGLOG("Subthread finished\n");
1546 pthread_exit((void *) 0);
1547 }
1548
1549 /* Process a command on the local node and store the result */
process_local_command(struct clvm_header * msg,int msglen,struct local_client * client,unsigned short xid)1550 static int process_local_command(struct clvm_header *msg, int msglen,
1551 struct local_client *client,
1552 unsigned short xid)
1553 {
1554 char *replybuf = malloc(max_cluster_message);
1555 int buflen = max_cluster_message - sizeof(struct clvm_header) - 1;
1556 int replylen = 0;
1557 int status;
1558
1559 DEBUGLOG("process_local_command: %s msg=%p, msglen =%d, client=%p\n",
1560 decode_cmd(msg->cmd), msg, msglen, client);
1561
1562 if (replybuf == NULL)
1563 return -1;
1564
1565 status = do_command(client, msg, msglen, &replybuf, buflen, &replylen);
1566
1567 if (status)
1568 client->bits.localsock.all_success = 0;
1569
1570 /* If we took too long then discard the reply */
1571 if (xid == client->xid) {
1572 add_reply_to_list(client, status, our_csid, replybuf, replylen);
1573 } else {
1574 DEBUGLOG
1575 ("Local command took too long, discarding xid %d, current is %d\n",
1576 xid, client->xid);
1577 }
1578
1579 free(replybuf);
1580 return status;
1581 }
1582
process_reply(const struct clvm_header * msg,int msglen,const char * csid)1583 static int process_reply(const struct clvm_header *msg, int msglen, const char *csid)
1584 {
1585 struct local_client *client = NULL;
1586
1587 client = find_client(msg->clientid);
1588 if (!client) {
1589 DEBUGLOG("Got message for unknown client 0x%x\n",
1590 msg->clientid);
1591 log_error("Got message for unknown client 0x%x\n",
1592 msg->clientid);
1593 return -1;
1594 }
1595
1596 if (msg->status)
1597 client->bits.localsock.all_success = 0;
1598
1599 /* Gather replies together for this client id */
1600 if (msg->xid == client->xid) {
1601 add_reply_to_list(client, msg->status, csid, msg->args,
1602 msg->arglen);
1603 } else {
1604 DEBUGLOG("Discarding reply with old XID %d, current = %d\n",
1605 msg->xid, client->xid);
1606 }
1607 return 0;
1608 }
1609
1610 /* Send an aggregated reply back to the client */
send_local_reply(struct local_client * client,int status,int fd)1611 static void send_local_reply(struct local_client *client, int status, int fd)
1612 {
1613 struct clvm_header *clientreply;
1614 struct node_reply *thisreply = client->bits.localsock.replies;
1615 char *replybuf;
1616 char *ptr;
1617 int message_len = 0;
1618
1619 DEBUGLOG("Send local reply\n");
1620
1621 /* Work out the total size of the reply */
1622 while (thisreply) {
1623 if (thisreply->replymsg)
1624 message_len += strlen(thisreply->replymsg) + 1;
1625 else
1626 message_len++;
1627
1628 message_len += strlen(thisreply->node) + 1 + sizeof(int);
1629
1630 thisreply = thisreply->next;
1631 }
1632
1633 /* Add in the size of our header */
1634 message_len = message_len + sizeof(struct clvm_header) + 1;
1635 replybuf = malloc(message_len);
1636
1637 clientreply = (struct clvm_header *) replybuf;
1638 clientreply->status = status;
1639 clientreply->cmd = CLVMD_CMD_REPLY;
1640 clientreply->node[0] = '\0';
1641 clientreply->flags = 0;
1642
1643 ptr = clientreply->args;
1644
1645 /* Add in all the replies, and free them as we go */
1646 thisreply = client->bits.localsock.replies;
1647 while (thisreply) {
1648 struct node_reply *tempreply = thisreply;
1649
1650 strcpy(ptr, thisreply->node);
1651 ptr += strlen(thisreply->node) + 1;
1652
1653 if (thisreply->status)
1654 clientreply->flags |= CLVMD_FLAG_NODEERRS;
1655
1656 memcpy(ptr, &thisreply->status, sizeof(int));
1657 ptr += sizeof(int);
1658
1659 if (thisreply->replymsg) {
1660 strcpy(ptr, thisreply->replymsg);
1661 ptr += strlen(thisreply->replymsg) + 1;
1662 } else {
1663 ptr[0] = '\0';
1664 ptr++;
1665 }
1666 thisreply = thisreply->next;
1667
1668 free(tempreply->replymsg);
1669 free(tempreply);
1670 }
1671
1672 /* Terminate with an empty node name */
1673 *ptr = '\0';
1674
1675 clientreply->arglen = ptr - clientreply->args + 1;
1676
1677 /* And send it */
1678 send_message(replybuf, message_len, our_csid, fd,
1679 "Error sending REPLY to client");
1680 free(replybuf);
1681
1682 /* Reset comms variables */
1683 client->bits.localsock.replies = NULL;
1684 client->bits.localsock.expected_replies = 0;
1685 client->bits.localsock.in_progress = FALSE;
1686 client->bits.localsock.sent_out = FALSE;
1687 }
1688
1689 /* Just free a reply chain baceuse it wasn't used. */
free_reply(struct local_client * client)1690 static void free_reply(struct local_client *client)
1691 {
1692 /* Add in all the replies, and free them as we go */
1693 struct node_reply *thisreply = client->bits.localsock.replies;
1694 while (thisreply) {
1695 struct node_reply *tempreply = thisreply;
1696
1697 thisreply = thisreply->next;
1698
1699 free(tempreply->replymsg);
1700 free(tempreply);
1701 }
1702 client->bits.localsock.replies = NULL;
1703 }
1704
1705 /* Send our version number to the cluster */
send_version_message()1706 static void send_version_message()
1707 {
1708 char message[sizeof(struct clvm_header) + sizeof(int) * 3];
1709 struct clvm_header *msg = (struct clvm_header *) message;
1710 int version_nums[3];
1711
1712 msg->cmd = CLVMD_CMD_VERSION;
1713 msg->status = 0;
1714 msg->flags = 0;
1715 msg->clientid = 0;
1716 msg->arglen = sizeof(version_nums);
1717
1718 version_nums[0] = htonl(CLVMD_MAJOR_VERSION);
1719 version_nums[1] = htonl(CLVMD_MINOR_VERSION);
1720 version_nums[2] = htonl(CLVMD_PATCH_VERSION);
1721
1722 memcpy(&msg->args, version_nums, sizeof(version_nums));
1723
1724 hton_clvm(msg);
1725
1726 clops->cluster_send_message(message, sizeof(message), NULL,
1727 "Error Sending version number");
1728 }
1729
1730 /* Send a message to either a local client or another server */
send_message(void * buf,int msglen,const char * csid,int fd,const char * errtext)1731 static int send_message(void *buf, int msglen, const char *csid, int fd,
1732 const char *errtext)
1733 {
1734 int len = 0;
1735 int saved_errno = 0;
1736 struct timespec delay;
1737 struct timespec remtime;
1738
1739 int retry_cnt = 0;
1740
1741 /* Send remote messages down the cluster socket */
1742 if (csid == NULL || !ISLOCAL_CSID(csid)) {
1743 hton_clvm((struct clvm_header *) buf);
1744 return clops->cluster_send_message(buf, msglen, csid, errtext);
1745 } else {
1746 int ptr = 0;
1747
1748 /* Make sure it all goes */
1749 do {
1750 if (retry_cnt > MAX_RETRIES)
1751 {
1752 errno = saved_errno;
1753 log_error("%s", errtext);
1754 errno = saved_errno;
1755 break;
1756 }
1757
1758 len = write(fd, buf + ptr, msglen - ptr);
1759
1760 if (len <= 0) {
1761 if (errno == EINTR)
1762 continue;
1763 if (errno == EAGAIN ||
1764 errno == EIO ||
1765 errno == ENOSPC) {
1766 saved_errno = errno;
1767 retry_cnt++;
1768
1769 delay.tv_sec = 0;
1770 delay.tv_nsec = 100000;
1771 remtime.tv_sec = 0;
1772 remtime.tv_nsec = 0;
1773 (void) nanosleep (&delay, &remtime);
1774
1775 continue;
1776 }
1777 log_error("%s", errtext);
1778 break;
1779 }
1780 ptr += len;
1781 } while (ptr < msglen);
1782 }
1783 return len;
1784 }
1785
process_work_item(struct lvm_thread_cmd * cmd)1786 static int process_work_item(struct lvm_thread_cmd *cmd)
1787 {
1788 /* If msg is NULL then this is a cleanup request */
1789 if (cmd->msg == NULL) {
1790 DEBUGLOG("process_work_item: free fd %d\n", cmd->client->fd);
1791 cmd_client_cleanup(cmd->client);
1792 free(cmd->client);
1793 return 0;
1794 }
1795
1796 if (!cmd->remote) {
1797 DEBUGLOG("process_work_item: local\n");
1798 process_local_command(cmd->msg, cmd->msglen, cmd->client,
1799 cmd->xid);
1800 } else {
1801 DEBUGLOG("process_work_item: remote\n");
1802 process_remote_command(cmd->msg, cmd->msglen, cmd->client->fd,
1803 cmd->csid);
1804 }
1805 return 0;
1806 }
1807
1808 /*
1809 * Routine that runs in the "LVM thread".
1810 */
lvm_thread_fn(void * arg)1811 static void lvm_thread_fn(void *arg)
1812 {
1813 struct dm_list *cmdl, *tmp;
1814 sigset_t ss;
1815 int using_gulm = (int)(long)arg;
1816
1817 DEBUGLOG("LVM thread function started\n");
1818
1819 /* Ignore SIGUSR1 & 2 */
1820 sigemptyset(&ss);
1821 sigaddset(&ss, SIGUSR1);
1822 sigaddset(&ss, SIGUSR2);
1823 pthread_sigmask(SIG_BLOCK, &ss, NULL);
1824
1825 /* Initialise the interface to liblvm */
1826 init_lvm(using_gulm);
1827
1828 /* Allow others to get moving */
1829 pthread_mutex_unlock(&lvm_start_mutex);
1830
1831 /* Now wait for some actual work */
1832 for (;;) {
1833 DEBUGLOG("LVM thread waiting for work\n");
1834
1835 pthread_mutex_lock(&lvm_thread_mutex);
1836 if (dm_list_empty(&lvm_cmd_head))
1837 pthread_cond_wait(&lvm_thread_cond, &lvm_thread_mutex);
1838
1839 dm_list_iterate_safe(cmdl, tmp, &lvm_cmd_head) {
1840 struct lvm_thread_cmd *cmd;
1841
1842 cmd =
1843 dm_list_struct_base(cmdl, struct lvm_thread_cmd, list);
1844 dm_list_del(&cmd->list);
1845 pthread_mutex_unlock(&lvm_thread_mutex);
1846
1847 process_work_item(cmd);
1848 free(cmd->msg);
1849 free(cmd);
1850
1851 pthread_mutex_lock(&lvm_thread_mutex);
1852 }
1853 pthread_mutex_unlock(&lvm_thread_mutex);
1854 }
1855 }
1856
1857 /* Pass down some work to the LVM thread */
add_to_lvmqueue(struct local_client * client,struct clvm_header * msg,int msglen,const char * csid)1858 static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg,
1859 int msglen, const char *csid)
1860 {
1861 struct lvm_thread_cmd *cmd;
1862
1863 cmd = malloc(sizeof(struct lvm_thread_cmd));
1864 if (!cmd)
1865 return ENOMEM;
1866
1867 if (msglen) {
1868 cmd->msg = malloc(msglen);
1869 if (!cmd->msg) {
1870 log_error("Unable to allocate buffer space\n");
1871 free(cmd);
1872 return -1;
1873 }
1874 memcpy(cmd->msg, msg, msglen);
1875 }
1876 else {
1877 cmd->msg = NULL;
1878 }
1879 cmd->client = client;
1880 cmd->msglen = msglen;
1881 cmd->xid = client->xid;
1882
1883 if (csid) {
1884 memcpy(cmd->csid, csid, max_csid_len);
1885 cmd->remote = 1;
1886 } else {
1887 cmd->remote = 0;
1888 }
1889
1890 DEBUGLOG
1891 ("add_to_lvmqueue: cmd=%p. client=%p, msg=%p, len=%d, csid=%p, xid=%d\n",
1892 cmd, client, msg, msglen, csid, cmd->xid);
1893 pthread_mutex_lock(&lvm_thread_mutex);
1894 dm_list_add(&lvm_cmd_head, &cmd->list);
1895 pthread_cond_signal(&lvm_thread_cond);
1896 pthread_mutex_unlock(&lvm_thread_mutex);
1897
1898 return 0;
1899 }
1900
1901 /* Return 0 if we can talk to an existing clvmd */
check_local_clvmd(void)1902 static int check_local_clvmd(void)
1903 {
1904 int local_socket;
1905 struct sockaddr_un sockaddr;
1906 int ret = 0;
1907
1908 /* Open local socket */
1909 if ((local_socket = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
1910 return -1;
1911 }
1912
1913 memset(&sockaddr, 0, sizeof(sockaddr));
1914 memcpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(CLVMD_SOCKNAME));
1915 sockaddr.sun_family = AF_UNIX;
1916
1917 if (connect(local_socket,(struct sockaddr *) &sockaddr,
1918 sizeof(sockaddr))) {
1919 ret = -1;
1920 }
1921
1922 close(local_socket);
1923 return ret;
1924 }
1925
1926
1927 /* Open the local socket, that's the one we talk to libclvm down */
open_local_sock()1928 static int open_local_sock()
1929 {
1930 int local_socket;
1931 struct sockaddr_un sockaddr;
1932
1933 /* Open local socket */
1934 if (CLVMD_SOCKNAME[0] != '\0')
1935 unlink(CLVMD_SOCKNAME);
1936 local_socket = socket(PF_UNIX, SOCK_STREAM, 0);
1937 if (local_socket < 0) {
1938 log_error("Can't create local socket: %s", strerror(errno));
1939 return -1;
1940 }
1941 /* Set Close-on-exec & non-blocking */
1942 fcntl(local_socket, F_SETFD, 1);
1943 fcntl(local_socket, F_SETFL, fcntl(local_socket, F_GETFL, 0) | O_NONBLOCK);
1944
1945 memset(&sockaddr, 0, sizeof(sockaddr));
1946 memcpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(CLVMD_SOCKNAME));
1947 sockaddr.sun_family = AF_UNIX;
1948 if (bind(local_socket, (struct sockaddr *) &sockaddr, sizeof(sockaddr))) {
1949 log_error("can't bind local socket: %s", strerror(errno));
1950 close(local_socket);
1951 return -1;
1952 }
1953 if (listen(local_socket, 1) != 0) {
1954 log_error("listen local: %s", strerror(errno));
1955 close(local_socket);
1956 return -1;
1957 }
1958 if (CLVMD_SOCKNAME[0] != '\0')
1959 chmod(CLVMD_SOCKNAME, 0600);
1960
1961 return local_socket;
1962 }
1963
process_message(struct local_client * client,const char * buf,int len,const char * csid)1964 void process_message(struct local_client *client, const char *buf, int len,
1965 const char *csid)
1966 {
1967 struct clvm_header *inheader;
1968
1969 inheader = (struct clvm_header *) buf;
1970 ntoh_clvm(inheader); /* Byteswap fields */
1971 if (inheader->cmd == CLVMD_CMD_REPLY)
1972 process_reply(inheader, len, csid);
1973 else
1974 add_to_lvmqueue(client, inheader, len, csid);
1975 }
1976
1977
check_all_callback(struct local_client * client,const char * csid,int node_up)1978 static void check_all_callback(struct local_client *client, const char *csid,
1979 int node_up)
1980 {
1981 if (!node_up)
1982 add_reply_to_list(client, EHOSTDOWN, csid, "CLVMD not running",
1983 18);
1984 }
1985
1986 /* Check to see if all CLVMDs are running (ie one on
1987 every node in the cluster).
1988 If not, returns -1 and prints out a list of errant nodes */
check_all_clvmds_running(struct local_client * client)1989 static int check_all_clvmds_running(struct local_client *client)
1990 {
1991 DEBUGLOG("check_all_clvmds_running\n");
1992 return clops->cluster_do_node_callback(client, check_all_callback);
1993 }
1994
1995 /* Return a local_client struct given a client ID.
1996 client IDs are in network byte order */
find_client(int clientid)1997 static struct local_client *find_client(int clientid)
1998 {
1999 struct local_client *thisfd;
2000 for (thisfd = &local_client_head; thisfd != NULL; thisfd = thisfd->next) {
2001 if (thisfd->fd == ntohl(clientid))
2002 return thisfd;
2003 }
2004 return NULL;
2005 }
2006
2007 /* Byte-swapping routines for the header so we
2008 work in a heterogeneous environment */
hton_clvm(struct clvm_header * hdr)2009 static void hton_clvm(struct clvm_header *hdr)
2010 {
2011 hdr->status = htonl(hdr->status);
2012 hdr->arglen = htonl(hdr->arglen);
2013 hdr->xid = htons(hdr->xid);
2014 /* Don't swap clientid as it's only a token as far as
2015 remote nodes are concerned */
2016 }
2017
ntoh_clvm(struct clvm_header * hdr)2018 static void ntoh_clvm(struct clvm_header *hdr)
2019 {
2020 hdr->status = ntohl(hdr->status);
2021 hdr->arglen = ntohl(hdr->arglen);
2022 hdr->xid = ntohs(hdr->xid);
2023 }
2024
2025 /* Handler for SIGUSR2 - sent to kill subthreads */
sigusr2_handler(int sig)2026 static void sigusr2_handler(int sig)
2027 {
2028 DEBUGLOG("SIGUSR2 received\n");
2029 return;
2030 }
2031
sigterm_handler(int sig)2032 static void sigterm_handler(int sig)
2033 {
2034 DEBUGLOG("SIGTERM received\n");
2035 quit = 1;
2036 return;
2037 }
2038
sighup_handler(int sig)2039 static void sighup_handler(int sig)
2040 {
2041 DEBUGLOG("got SIGHUP\n");
2042 reread_config = 1;
2043 }
2044
sync_lock(const char * resource,int mode,int flags,int * lockid)2045 int sync_lock(const char *resource, int mode, int flags, int *lockid)
2046 {
2047 return clops->sync_lock(resource, mode, flags, lockid);
2048 }
2049
sync_unlock(const char * resource,int lockid)2050 int sync_unlock(const char *resource, int lockid)
2051 {
2052 return clops->sync_unlock(resource, lockid);
2053 }
2054
parse_cluster_interface(char * ifname)2055 static if_type_t parse_cluster_interface(char *ifname)
2056 {
2057 if_type_t iface = IF_AUTO;
2058
2059 if (!strcmp(ifname, "auto"))
2060 iface = IF_AUTO;
2061 if (!strcmp(ifname, "cman"))
2062 iface = IF_CMAN;
2063 if (!strcmp(ifname, "gulm"))
2064 iface = IF_GULM;
2065 if (!strcmp(ifname, "openais"))
2066 iface = IF_OPENAIS;
2067 if (!strcmp(ifname, "corosync"))
2068 iface = IF_COROSYNC;
2069
2070 return iface;
2071 }
2072
2073 /*
2074 * Try and find a cluster system in corosync's objdb, if it is running. This is
2075 * only called if the command-line option is not present, and if it fails
2076 * we still try the interfaces in order.
2077 */
get_cluster_type()2078 static if_type_t get_cluster_type()
2079 {
2080 #ifdef HAVE_COROSYNC_CONFDB_H
2081 confdb_handle_t handle;
2082 if_type_t type = IF_AUTO;
2083 int result;
2084 char buf[255];
2085 size_t namelen = sizeof(buf);
2086 hdb_handle_t cluster_handle;
2087 hdb_handle_t clvmd_handle;
2088 confdb_callbacks_t callbacks = {
2089 .confdb_key_change_notify_fn = NULL,
2090 .confdb_object_create_change_notify_fn = NULL,
2091 .confdb_object_delete_change_notify_fn = NULL
2092 };
2093
2094 result = confdb_initialize (&handle, &callbacks);
2095 if (result != CS_OK)
2096 return type;
2097
2098 result = confdb_object_find_start(handle, OBJECT_PARENT_HANDLE);
2099 if (result != CS_OK)
2100 goto out;
2101
2102 result = confdb_object_find(handle, OBJECT_PARENT_HANDLE, (void *)"cluster", strlen("cluster"), &cluster_handle);
2103 if (result != CS_OK)
2104 goto out;
2105
2106 result = confdb_object_find_start(handle, cluster_handle);
2107 if (result != CS_OK)
2108 goto out;
2109
2110 result = confdb_object_find(handle, cluster_handle, (void *)"clvmd", strlen("clvmd"), &clvmd_handle);
2111 if (result != CS_OK)
2112 goto out;
2113
2114 result = confdb_key_get(handle, clvmd_handle, (void *)"interface", strlen("interface"), buf, &namelen);
2115 if (result != CS_OK)
2116 goto out;
2117
2118 buf[namelen] = '\0';
2119 type = parse_cluster_interface(buf);
2120 DEBUGLOG("got interface type '%s' from confdb\n", buf);
2121 out:
2122 confdb_finalize(handle);
2123 return type;
2124 #else
2125 return IF_AUTO;
2126 #endif
2127 }
2128