1 /*	$NetBSD: clvmd.c,v 1.1.1.3 2009/12/02 00:27:05 haad Exp $	*/
2 
3 /*
4  * Copyright (C) 2002-2004 Sistina Software, Inc. All rights reserved.
5  * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
6  *
7  * This file is part of LVM2.
8  *
9  * This copyrighted material is made available to anyone wishing to use,
10  * modify, copy, or redistribute it subject to the terms and conditions
11  * of the GNU General Public License v.2.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this program; if not, write to the Free Software Foundation,
15  * Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16  */
17 
18 /*
19  * CLVMD: Cluster LVM daemon
20  */
21 
22 #define _GNU_SOURCE
23 #define _FILE_OFFSET_BITS 64
24 
25 #include <configure.h>
26 #include <libdevmapper.h>
27 
28 #include <pthread.h>
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <sys/socket.h>
32 #include <sys/uio.h>
33 #include <sys/un.h>
34 #include <sys/time.h>
35 #include <sys/ioctl.h>
36 #include <sys/utsname.h>
37 #include <netinet/in.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <stddef.h>
41 #include <stdarg.h>
42 #include <signal.h>
43 #include <unistd.h>
44 #include <fcntl.h>
45 #include <getopt.h>
46 #include <syslog.h>
47 #include <errno.h>
48 #include <limits.h>
49 #ifdef HAVE_COROSYNC_CONFDB_H
50 #include <corosync/confdb.h>
51 #endif
52 
53 #include "clvmd-comms.h"
54 #include "lvm-functions.h"
55 #include "clvm.h"
56 #include "lvm-version.h"
57 #include "clvmd.h"
58 #include "refresh_clvmd.h"
59 #include "lvm-logging.h"
60 
61 #ifndef TRUE
62 #define TRUE 1
63 #endif
64 #ifndef FALSE
65 #define FALSE 0
66 #endif
67 
68 #define MAX_RETRIES 4
69 
70 #define ISLOCAL_CSID(c) (memcmp(c, our_csid, max_csid_len) == 0)
71 
72 /* Head of the fd list. Also contains
73    the cluster_socket details */
74 static struct local_client local_client_head;
75 
76 static unsigned short global_xid = 0;	/* Last transaction ID issued */
77 
78 struct cluster_ops *clops = NULL;
79 
80 static char our_csid[MAX_CSID_LEN];
81 static unsigned max_csid_len;
82 static unsigned max_cluster_message;
83 static unsigned max_cluster_member_name_len;
84 
85 /* Structure of items on the LVM thread list */
86 struct lvm_thread_cmd {
87 	struct dm_list list;
88 
89 	struct local_client *client;
90 	struct clvm_header *msg;
91 	char csid[MAX_CSID_LEN];
92 	int remote;		/* Flag */
93 	int msglen;
94 	unsigned short xid;
95 };
96 
97 debug_t debug;
98 static pthread_t lvm_thread;
99 static pthread_mutex_t lvm_thread_mutex;
100 static pthread_cond_t lvm_thread_cond;
101 static pthread_mutex_t lvm_start_mutex;
102 static struct dm_list lvm_cmd_head;
103 static volatile sig_atomic_t quit = 0;
104 static volatile sig_atomic_t reread_config = 0;
105 static int child_pipe[2];
106 
107 /* Reasons the daemon failed initialisation */
108 #define DFAIL_INIT       1
109 #define DFAIL_LOCAL_SOCK 2
110 #define DFAIL_CLUSTER_IF 3
111 #define DFAIL_MALLOC     4
112 #define DFAIL_TIMEOUT    5
113 #define SUCCESS          0
114 
115 typedef enum {IF_AUTO, IF_CMAN, IF_GULM, IF_OPENAIS, IF_COROSYNC} if_type_t;
116 
117 typedef void *(lvm_pthread_fn_t)(void*);
118 
119 /* Prototypes for code further down */
120 static void sigusr2_handler(int sig);
121 static void sighup_handler(int sig);
122 static void sigterm_handler(int sig);
123 static void send_local_reply(struct local_client *client, int status,
124 			     int clientid);
125 static void free_reply(struct local_client *client);
126 static void send_version_message(void);
127 static void *pre_and_post_thread(void *arg);
128 static int send_message(void *buf, int msglen, const char *csid, int fd,
129 			const char *errtext);
130 static int read_from_local_sock(struct local_client *thisfd);
131 static int process_local_command(struct clvm_header *msg, int msglen,
132 				 struct local_client *client,
133 				 unsigned short xid);
134 static void process_remote_command(struct clvm_header *msg, int msglen, int fd,
135 				   const char *csid);
136 static int process_reply(const struct clvm_header *msg, int msglen,
137 			 const char *csid);
138 static int open_local_sock(void);
139 static int check_local_clvmd(void);
140 static struct local_client *find_client(int clientid);
141 static void main_loop(int local_sock, int cmd_timeout);
142 static void be_daemon(int start_timeout);
143 static int check_all_clvmds_running(struct local_client *client);
144 static int local_rendezvous_callback(struct local_client *thisfd, char *buf,
145 				     int len, const char *csid,
146 				     struct local_client **new_client);
147 static void lvm_thread_fn(void *) __attribute__ ((noreturn));
148 static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg,
149 			   int msglen, const char *csid);
150 static int distribute_command(struct local_client *thisfd);
151 static void hton_clvm(struct clvm_header *hdr);
152 static void ntoh_clvm(struct clvm_header *hdr);
153 static void add_reply_to_list(struct local_client *client, int status,
154 			      const char *csid, const char *buf, int len);
155 static if_type_t parse_cluster_interface(char *ifname);
156 static if_type_t get_cluster_type(void);
157 
158 static void usage(char *prog, FILE *file)
159 {
160 	fprintf(file, "Usage:\n");
161 	fprintf(file, "%s [Vhd]\n", prog);
162 	fprintf(file, "\n");
163 	fprintf(file, "   -V       Show version of clvmd\n");
164 	fprintf(file, "   -h       Show this help information\n");
165 	fprintf(file, "   -d       Set debug level\n");
166 	fprintf(file, "            If starting clvmd then don't fork, run in the foreground\n");
167 	fprintf(file, "   -R       Tell all running clvmds in the cluster to reload their device cache\n");
168 	fprintf(file, "   -C       Sets debug level (from -d) on all clvmd instances clusterwide\n");
169 	fprintf(file, "   -t<secs> Command timeout (default 60 seconds)\n");
170 	fprintf(file, "   -T<secs> Startup timeout (default none)\n");
171 	fprintf(file, "   -I<cmgr> Cluster manager (default: auto)\n");
172 	fprintf(file, "            Available cluster managers: ");
173 #ifdef USE_COROSYNC
174 	fprintf(file, "corosync ");
175 #endif
176 #ifdef USE_CMAN
177 	fprintf(file, "cman ");
178 #endif
179 #ifdef USE_OPENAIS
180 	fprintf(file, "openais ");
181 #endif
182 #ifdef USE_GULM
183 	fprintf(file, "gulm ");
184 #endif
185 	fprintf(file, "\n");
186 }
187 
188 /* Called to signal the parent how well we got on during initialisation */
189 static void child_init_signal(int status)
190 {
191         if (child_pipe[1]) {
192 	        write(child_pipe[1], &status, sizeof(status));
193 		close(child_pipe[1]);
194 	}
195 	if (status)
196 	        exit(status);
197 }
198 
199 
200 void debuglog(const char *fmt, ...)
201 {
202 	time_t P;
203 	va_list ap;
204 	static int syslog_init = 0;
205 
206 	if (debug == DEBUG_STDERR) {
207 		va_start(ap,fmt);
208 		time(&P);
209 		fprintf(stderr, "CLVMD[%x]: %.15s ", (int)pthread_self(), ctime(&P)+4 );
210 		vfprintf(stderr, fmt, ap);
211 		va_end(ap);
212 	}
213 	if (debug == DEBUG_SYSLOG) {
214 		if (!syslog_init) {
215 			openlog("clvmd", LOG_PID, LOG_DAEMON);
216 			syslog_init = 1;
217 		}
218 
219 		va_start(ap,fmt);
220 		vsyslog(LOG_DEBUG, fmt, ap);
221 		va_end(ap);
222 	}
223 }
224 
225 static const char *decode_cmd(unsigned char cmdl)
226 {
227 	static char buf[128];
228 	const char *command;
229 
230 	switch (cmdl) {
231 	case CLVMD_CMD_TEST:
232 		command = "TEST";
233 		break;
234 	case CLVMD_CMD_LOCK_VG:
235 		command = "LOCK_VG";
236 		break;
237 	case CLVMD_CMD_LOCK_LV:
238 		command = "LOCK_LV";
239 		break;
240 	case CLVMD_CMD_REFRESH:
241 		command = "REFRESH";
242 		break;
243 	case CLVMD_CMD_SET_DEBUG:
244 		command = "SET_DEBUG";
245 		break;
246 	case CLVMD_CMD_GET_CLUSTERNAME:
247 		command = "GET_CLUSTERNAME";
248 		break;
249 	case CLVMD_CMD_VG_BACKUP:
250 		command = "VG_BACKUP";
251 		break;
252 	case CLVMD_CMD_REPLY:
253 		command = "REPLY";
254 		break;
255 	case CLVMD_CMD_VERSION:
256 		command = "VERSION";
257 		break;
258 	case CLVMD_CMD_GOAWAY:
259 		command = "GOAWAY";
260 		break;
261 	case CLVMD_CMD_LOCK:
262 		command = "LOCK";
263 		break;
264 	case CLVMD_CMD_UNLOCK:
265 		command = "UNLOCK";
266 		break;
267 	case CLVMD_CMD_LOCK_QUERY:
268 		command = "LOCK_QUERY";
269 		break;
270 	default:
271 		command = "unknown";
272 		break;
273 	}
274 
275 	sprintf(buf, "%s (0x%x)", command, cmdl);
276 
277 	return buf;
278 }
279 
280 int main(int argc, char *argv[])
281 {
282 	int local_sock;
283 	struct local_client *newfd;
284 	struct utsname nodeinfo;
285 	signed char opt;
286 	int cmd_timeout = DEFAULT_CMD_TIMEOUT;
287 	int start_timeout = 0;
288 	if_type_t cluster_iface = IF_AUTO;
289 	sigset_t ss;
290 	int using_gulm = 0;
291 	int debug_opt = 0;
292 	int clusterwide_opt = 0;
293 
294 	/* Deal with command-line arguments */
295 	opterr = 0;
296 	optind = 0;
297 	while ((opt = getopt(argc, argv, "?vVhd::t:RT:CI:")) != EOF) {
298 		switch (opt) {
299 		case 'h':
300 			usage(argv[0], stdout);
301 			exit(0);
302 
303 		case '?':
304 			usage(argv[0], stderr);
305 			exit(0);
306 
307 		case 'R':
308 			return refresh_clvmd()==1?0:1;
309 
310 		case 'C':
311 			clusterwide_opt = 1;
312 			break;
313 
314 		case 'd':
315 			debug_opt = 1;
316 			if (optarg)
317 				debug = atoi(optarg);
318 			else
319 				debug = DEBUG_STDERR;
320 			break;
321 
322 		case 't':
323 			cmd_timeout = atoi(optarg);
324 			if (!cmd_timeout) {
325 				fprintf(stderr, "command timeout is invalid\n");
326 				usage(argv[0], stderr);
327 				exit(1);
328 			}
329 			break;
330 		case 'I':
331 			cluster_iface = parse_cluster_interface(optarg);
332 			break;
333 		case 'T':
334 			start_timeout = atoi(optarg);
335 			if (start_timeout <= 0) {
336 				fprintf(stderr, "startup timeout is invalid\n");
337 				usage(argv[0], stderr);
338 				exit(1);
339 			}
340 			break;
341 
342 		case 'V':
343 		        printf("Cluster LVM daemon version: %s\n", LVM_VERSION);
344 			printf("Protocol version:           %d.%d.%d\n",
345 			       CLVMD_MAJOR_VERSION, CLVMD_MINOR_VERSION,
346 			       CLVMD_PATCH_VERSION);
347 			exit(1);
348 			break;
349 
350 		}
351 	}
352 
353 	/* Setting debug options on an existing clvmd */
354 	if (debug_opt && !check_local_clvmd()) {
355 
356 		/* Sending to stderr makes no sense for a detached daemon */
357 		if (debug == DEBUG_STDERR)
358 			debug = DEBUG_SYSLOG;
359 		return debug_clvmd(debug, clusterwide_opt)==1?0:1;
360 	}
361 
362 	/* Fork into the background (unless requested not to) */
363 	if (debug != DEBUG_STDERR) {
364 		be_daemon(start_timeout);
365 	}
366 
367 	DEBUGLOG("CLVMD started\n");
368 
369 	/* Open the Unix socket we listen for commands on.
370 	   We do this before opening the cluster socket so that
371 	   potential clients will block rather than error if we are running
372 	   but the cluster is not ready yet */
373 	local_sock = open_local_sock();
374 	if (local_sock < 0)
375 		child_init_signal(DFAIL_LOCAL_SOCK);
376 
377 	/* Set up signal handlers, USR1 is for cluster change notifications (in cman)
378 	   USR2 causes child threads to exit.
379 	   HUP causes gulm version to re-read nodes list from CCS.
380 	   PIPE should be ignored */
381 	signal(SIGUSR2, sigusr2_handler);
382 	signal(SIGHUP,  sighup_handler);
383 	signal(SIGPIPE, SIG_IGN);
384 
385 	/* Block SIGUSR2/SIGINT/SIGTERM in process */
386 	sigemptyset(&ss);
387 	sigaddset(&ss, SIGUSR2);
388 	sigaddset(&ss, SIGINT);
389 	sigaddset(&ss, SIGTERM);
390 	sigprocmask(SIG_BLOCK, &ss, NULL);
391 
392 	/* Initialise the LVM thread variables */
393 	dm_list_init(&lvm_cmd_head);
394 	pthread_mutex_init(&lvm_thread_mutex, NULL);
395 	pthread_cond_init(&lvm_thread_cond, NULL);
396 	pthread_mutex_init(&lvm_start_mutex, NULL);
397 	init_lvhash();
398 
399 	/* Start the cluster interface */
400 	if (cluster_iface == IF_AUTO)
401 		cluster_iface = get_cluster_type();
402 
403 #ifdef USE_CMAN
404 	if ((cluster_iface == IF_AUTO || cluster_iface == IF_CMAN) && (clops = init_cman_cluster())) {
405 		max_csid_len = CMAN_MAX_CSID_LEN;
406 		max_cluster_message = CMAN_MAX_CLUSTER_MESSAGE;
407 		max_cluster_member_name_len = CMAN_MAX_NODENAME_LEN;
408 		syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to CMAN");
409 	}
410 #endif
411 #ifdef USE_GULM
412 	if (!clops)
413 		if ((cluster_iface == IF_AUTO || cluster_iface == IF_GULM) && (clops = init_gulm_cluster())) {
414 			max_csid_len = GULM_MAX_CSID_LEN;
415 			max_cluster_message = GULM_MAX_CLUSTER_MESSAGE;
416 			max_cluster_member_name_len = GULM_MAX_CLUSTER_MEMBER_NAME_LEN;
417 			using_gulm = 1;
418 			syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to GULM");
419 		}
420 #endif
421 #ifdef USE_COROSYNC
422 	if (!clops)
423 		if (((cluster_iface == IF_AUTO || cluster_iface == IF_COROSYNC) && (clops = init_corosync_cluster()))) {
424 			max_csid_len = COROSYNC_CSID_LEN;
425 			max_cluster_message = COROSYNC_MAX_CLUSTER_MESSAGE;
426 			max_cluster_member_name_len = COROSYNC_MAX_CLUSTER_MEMBER_NAME_LEN;
427 			syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to Corosync");
428 		}
429 #endif
430 #ifdef USE_OPENAIS
431 	if (!clops)
432 		if ((cluster_iface == IF_AUTO || cluster_iface == IF_OPENAIS) && (clops = init_openais_cluster())) {
433 			max_csid_len = OPENAIS_CSID_LEN;
434 			max_cluster_message = OPENAIS_MAX_CLUSTER_MESSAGE;
435 			max_cluster_member_name_len = OPENAIS_MAX_CLUSTER_MEMBER_NAME_LEN;
436 			syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to OpenAIS");
437 		}
438 #endif
439 
440 	if (!clops) {
441 		DEBUGLOG("Can't initialise cluster interface\n");
442 		log_error("Can't initialise cluster interface\n");
443 		child_init_signal(DFAIL_CLUSTER_IF);
444 	}
445 	DEBUGLOG("Cluster ready, doing some more initialisation\n");
446 
447 	/* Save our CSID */
448 	uname(&nodeinfo);
449 	clops->get_our_csid(our_csid);
450 
451 	/* Initialise the FD list head */
452 	local_client_head.fd = clops->get_main_cluster_fd();
453 	local_client_head.type = CLUSTER_MAIN_SOCK;
454 	local_client_head.callback = clops->cluster_fd_callback;
455 
456 	/* Add the local socket to the list */
457 	newfd = malloc(sizeof(struct local_client));
458 	if (!newfd)
459 	        child_init_signal(DFAIL_MALLOC);
460 
461 	newfd->fd = local_sock;
462 	newfd->removeme = 0;
463 	newfd->type = LOCAL_RENDEZVOUS;
464 	newfd->callback = local_rendezvous_callback;
465 	newfd->next = local_client_head.next;
466 	local_client_head.next = newfd;
467 
468 	/* This needs to be started after cluster initialisation
469 	   as it may need to take out locks */
470 	DEBUGLOG("starting LVM thread\n");
471 
472 	/* Don't let anyone else to do work until we are started */
473 	pthread_mutex_lock(&lvm_start_mutex);
474 	pthread_create(&lvm_thread, NULL, (lvm_pthread_fn_t*)lvm_thread_fn,
475 			(void *)(long)using_gulm);
476 
477 	/* Tell the rest of the cluster our version number */
478 	/* CMAN can do this immediately, gulm needs to wait until
479 	   the core initialisation has finished and the node list
480 	   has been gathered */
481 	if (clops->cluster_init_completed)
482 		clops->cluster_init_completed();
483 
484 	DEBUGLOG("clvmd ready for work\n");
485 	child_init_signal(SUCCESS);
486 
487 	/* Try to shutdown neatly */
488 	signal(SIGTERM, sigterm_handler);
489 	signal(SIGINT, sigterm_handler);
490 
491 	/* Do some work */
492 	main_loop(local_sock, cmd_timeout);
493 
494 	destroy_lvm();
495 
496 	return 0;
497 }
498 
499 /* Called when the GuLM cluster layer has completed initialisation.
500    We send the version message */
501 void clvmd_cluster_init_completed()
502 {
503 	send_version_message();
504 }
505 
506 /* Data on a connected socket */
507 static int local_sock_callback(struct local_client *thisfd, char *buf, int len,
508 			       const char *csid,
509 			       struct local_client **new_client)
510 {
511 	*new_client = NULL;
512 	return read_from_local_sock(thisfd);
513 }
514 
515 /* Data on a connected socket */
516 static int local_rendezvous_callback(struct local_client *thisfd, char *buf,
517 				     int len, const char *csid,
518 				     struct local_client **new_client)
519 {
520 	/* Someone connected to our local socket, accept it. */
521 
522 	struct sockaddr_un socka;
523 	struct local_client *newfd;
524 	socklen_t sl = sizeof(socka);
525 	int client_fd = accept(thisfd->fd, (struct sockaddr *) &socka, &sl);
526 
527 	if (client_fd == -1 && errno == EINTR)
528 		return 1;
529 
530 	if (client_fd >= 0) {
531 		newfd = malloc(sizeof(struct local_client));
532 		if (!newfd) {
533 			close(client_fd);
534 			return 1;
535 		}
536 		newfd->fd = client_fd;
537 		newfd->type = LOCAL_SOCK;
538 		newfd->xid = 0;
539 		newfd->removeme = 0;
540 		newfd->callback = local_sock_callback;
541 		newfd->bits.localsock.replies = NULL;
542 		newfd->bits.localsock.expected_replies = 0;
543 		newfd->bits.localsock.cmd = NULL;
544 		newfd->bits.localsock.in_progress = FALSE;
545 		newfd->bits.localsock.sent_out = FALSE;
546 		newfd->bits.localsock.threadid = 0;
547 		newfd->bits.localsock.finished = 0;
548 		newfd->bits.localsock.pipe_client = NULL;
549 		newfd->bits.localsock.private = NULL;
550 		newfd->bits.localsock.all_success = 1;
551 		DEBUGLOG("Got new connection on fd %d\n", newfd->fd);
552 		*new_client = newfd;
553 	}
554 	return 1;
555 }
556 
557 static int local_pipe_callback(struct local_client *thisfd, char *buf,
558 			       int maxlen, const char *csid,
559 			       struct local_client **new_client)
560 {
561 	int len;
562 	char buffer[PIPE_BUF];
563 	struct local_client *sock_client = thisfd->bits.pipe.client;
564 	int status = -1;	/* in error by default */
565 
566 	len = read(thisfd->fd, buffer, sizeof(int));
567 	if (len == -1 && errno == EINTR)
568 		return 1;
569 
570 	if (len == sizeof(int)) {
571 		memcpy(&status, buffer, sizeof(int));
572 	}
573 
574 	DEBUGLOG("read on PIPE %d: %d bytes: status: %d\n",
575 		 thisfd->fd, len, status);
576 
577 	/* EOF on pipe or an error, close it */
578 	if (len <= 0) {
579 		int jstat;
580 		void *ret = &status;
581 		close(thisfd->fd);
582 
583 		/* Clear out the cross-link */
584 		if (thisfd->bits.pipe.client != NULL)
585 			thisfd->bits.pipe.client->bits.localsock.pipe_client =
586 			    NULL;
587 
588 		/* Reap child thread */
589 		if (thisfd->bits.pipe.threadid) {
590 			jstat = pthread_join(thisfd->bits.pipe.threadid, &ret);
591 			thisfd->bits.pipe.threadid = 0;
592 			if (thisfd->bits.pipe.client != NULL)
593 				thisfd->bits.pipe.client->bits.localsock.
594 				    threadid = 0;
595 		}
596 		return -1;
597 	} else {
598 		DEBUGLOG("background routine status was %d, sock_client=%p\n",
599 			 status, sock_client);
600 		/* But has the client gone away ?? */
601 		if (sock_client == NULL) {
602 			DEBUGLOG
603 			    ("Got PIPE response for dead client, ignoring it\n");
604 		} else {
605 			/* If error then just return that code */
606 			if (status)
607 				send_local_reply(sock_client, status,
608 						 sock_client->fd);
609 			else {
610 				if (sock_client->bits.localsock.state ==
611 				    POST_COMMAND) {
612 					send_local_reply(sock_client, 0,
613 							 sock_client->fd);
614 				} else	// PRE_COMMAND finished.
615 				{
616 					if (
617 					    (status =
618 					     distribute_command(sock_client)) !=
619 					    0) send_local_reply(sock_client,
620 								EFBIG,
621 								sock_client->
622 								fd);
623 				}
624 			}
625 		}
626 	}
627 	return len;
628 }
629 
630 /* If a noed is up, look for it in the reply array, if it's not there then
631    add one with "ETIMEDOUT".
632    NOTE: This won't race with real replies because they happen in the same thread.
633 */
634 static void timedout_callback(struct local_client *client, const char *csid,
635 			      int node_up)
636 {
637 	if (node_up) {
638 		struct node_reply *reply;
639 		char nodename[max_cluster_member_name_len];
640 
641 		clops->name_from_csid(csid, nodename);
642 		DEBUGLOG("Checking for a reply from %s\n", nodename);
643 		pthread_mutex_lock(&client->bits.localsock.reply_mutex);
644 
645 		reply = client->bits.localsock.replies;
646 		while (reply && strcmp(reply->node, nodename) != 0) {
647 			reply = reply->next;
648 		}
649 
650 		pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
651 
652 		if (!reply) {
653 			DEBUGLOG("Node %s timed-out\n", nodename);
654 			add_reply_to_list(client, ETIMEDOUT, csid,
655 					  "Command timed out", 18);
656 		}
657 	}
658 }
659 
660 /* Called when the request has timed out on at least one node. We fill in
661    the remaining node entries with ETIMEDOUT and return.
662 
663    By the time we get here the node that caused
664    the timeout could have gone down, in which case we will never get the expected
665    number of replies that triggers the post command so we need to do it here
666 */
667 static void request_timed_out(struct local_client *client)
668 {
669 	DEBUGLOG("Request timed-out. padding\n");
670 	clops->cluster_do_node_callback(client, timedout_callback);
671 
672 	if (client->bits.localsock.num_replies !=
673 	    client->bits.localsock.expected_replies) {
674 		/* Post-process the command */
675 		if (client->bits.localsock.threadid) {
676 			pthread_mutex_lock(&client->bits.localsock.mutex);
677 			client->bits.localsock.state = POST_COMMAND;
678 			pthread_cond_signal(&client->bits.localsock.cond);
679 			pthread_mutex_unlock(&client->bits.localsock.mutex);
680 		}
681 	}
682 }
683 
684 /* This is where the real work happens */
685 static void main_loop(int local_sock, int cmd_timeout)
686 {
687 	DEBUGLOG("Using timeout of %d seconds\n", cmd_timeout);
688 
689 	sigset_t ss;
690 	sigemptyset(&ss);
691 	sigaddset(&ss, SIGINT);
692 	sigaddset(&ss, SIGTERM);
693 	pthread_sigmask(SIG_UNBLOCK, &ss, NULL);
694 	/* Main loop */
695 	while (!quit) {
696 		fd_set in;
697 		int select_status;
698 		struct local_client *thisfd;
699 		struct timeval tv = { cmd_timeout, 0 };
700 		int quorate = clops->is_quorate();
701 
702 		/* Wait on the cluster FD and all local sockets/pipes */
703 		local_client_head.fd = clops->get_main_cluster_fd();
704 		FD_ZERO(&in);
705 		for (thisfd = &local_client_head; thisfd != NULL;
706 		     thisfd = thisfd->next) {
707 
708 			if (thisfd->removeme)
709 				continue;
710 
711 			/* if the cluster is not quorate then don't listen for new requests */
712 			if ((thisfd->type != LOCAL_RENDEZVOUS &&
713 			     thisfd->type != LOCAL_SOCK) || quorate)
714 				FD_SET(thisfd->fd, &in);
715 		}
716 
717 		select_status = select(FD_SETSIZE, &in, NULL, NULL, &tv);
718 
719 		if (reread_config) {
720 			int saved_errno = errno;
721 
722 			reread_config = 0;
723 			if (clops->reread_config)
724 				clops->reread_config();
725 			errno = saved_errno;
726 		}
727 
728 		if (select_status > 0) {
729 			struct local_client *lastfd = NULL;
730 			char csid[MAX_CSID_LEN];
731 			char buf[max_cluster_message];
732 
733 			for (thisfd = &local_client_head; thisfd != NULL;
734 			     thisfd = thisfd->next) {
735 
736 				if (thisfd->removeme) {
737 					struct local_client *free_fd;
738 					lastfd->next = thisfd->next;
739 					free_fd = thisfd;
740 					thisfd = lastfd;
741 
742 					DEBUGLOG("removeme set for fd %d\n", free_fd->fd);
743 
744 					/* Queue cleanup, this also frees the client struct */
745 					add_to_lvmqueue(free_fd, NULL, 0, NULL);
746 					break;
747 				}
748 
749 				if (FD_ISSET(thisfd->fd, &in)) {
750 					struct local_client *newfd = NULL;
751 					int ret;
752 
753 					/* Do callback */
754 					ret =
755 					    thisfd->callback(thisfd, buf,
756 							     sizeof(buf), csid,
757 							     &newfd);
758 					/* Ignore EAGAIN */
759 					if (ret < 0 && (errno == EAGAIN ||
760 							errno == EINTR)) continue;
761 
762 					/* Got error or EOF: Remove it from the list safely */
763 					if (ret <= 0) {
764 						struct local_client *free_fd;
765 						int type = thisfd->type;
766 
767 						/* If the cluster socket shuts down, so do we */
768 						if (type == CLUSTER_MAIN_SOCK ||
769 						    type == CLUSTER_INTERNAL)
770 							goto closedown;
771 
772 						DEBUGLOG("ret == %d, errno = %d. removing client\n",
773 							 ret, errno);
774 						lastfd->next = thisfd->next;
775 						free_fd = thisfd;
776 						thisfd = lastfd;
777 						close(free_fd->fd);
778 
779 						/* Queue cleanup, this also frees the client struct */
780 						add_to_lvmqueue(free_fd, NULL, 0, NULL);
781 						break;
782 					}
783 
784 					/* New client...simply add it to the list */
785 					if (newfd) {
786 						newfd->next = thisfd->next;
787 						thisfd->next = newfd;
788 						break;
789 					}
790 				}
791 				lastfd = thisfd;
792 			}
793 		}
794 
795 		/* Select timed out. Check for clients that have been waiting too long for a response */
796 		if (select_status == 0) {
797 			time_t the_time = time(NULL);
798 
799 			for (thisfd = &local_client_head; thisfd != NULL;
800 			     thisfd = thisfd->next) {
801 				if (thisfd->type == LOCAL_SOCK
802 				    && thisfd->bits.localsock.sent_out
803 				    && thisfd->bits.localsock.sent_time +
804 				    cmd_timeout < the_time
805 				    && thisfd->bits.localsock.
806 				    expected_replies !=
807 				    thisfd->bits.localsock.num_replies) {
808 					/* Send timed out message + replies we already have */
809 					DEBUGLOG
810 					    ("Request timed-out (send: %ld, now: %ld)\n",
811 					     thisfd->bits.localsock.sent_time,
812 					     the_time);
813 
814 					thisfd->bits.localsock.all_success = 0;
815 
816 					request_timed_out(thisfd);
817 				}
818 			}
819 		}
820 		if (select_status < 0) {
821 			if (errno == EINTR)
822 				continue;
823 
824 #ifdef DEBUG
825 			perror("select error");
826 			exit(-1);
827 #endif
828 		}
829 	}
830 
831       closedown:
832 	clops->cluster_closedown();
833 	close(local_sock);
834 }
835 
836 static __attribute__ ((noreturn)) void wait_for_child(int c_pipe, int timeout)
837 {
838 	int child_status;
839 	int sstat;
840 	fd_set fds;
841 	struct timeval tv = {timeout, 0};
842 
843 	FD_ZERO(&fds);
844 	FD_SET(c_pipe, &fds);
845 
846 	sstat = select(c_pipe+1, &fds, NULL, NULL, timeout? &tv: NULL);
847 	if (sstat == 0) {
848 		fprintf(stderr, "clvmd startup timed out\n");
849 		exit(DFAIL_TIMEOUT);
850 	}
851 	if (sstat == 1) {
852 		if (read(c_pipe, &child_status, sizeof(child_status)) !=
853 		    sizeof(child_status)) {
854 
855 			fprintf(stderr, "clvmd failed in initialisation\n");
856 			exit(DFAIL_INIT);
857 		}
858 		else {
859 			switch (child_status) {
860 			case SUCCESS:
861 				break;
862 			case DFAIL_INIT:
863 				fprintf(stderr, "clvmd failed in initialisation\n");
864 				break;
865 			case DFAIL_LOCAL_SOCK:
866 				fprintf(stderr, "clvmd could not create local socket\n");
867 				fprintf(stderr, "Another clvmd is probably already running\n");
868 				break;
869 			case DFAIL_CLUSTER_IF:
870 				fprintf(stderr, "clvmd could not connect to cluster manager\n");
871 				fprintf(stderr, "Consult syslog for more information\n");
872 				break;
873 			case DFAIL_MALLOC:
874 				fprintf(stderr, "clvmd failed, not enough memory\n");
875 				break;
876 			default:
877 				fprintf(stderr, "clvmd failed, error was %d\n", child_status);
878 				break;
879 			}
880 			exit(child_status);
881 		}
882 	}
883 	fprintf(stderr, "clvmd startup, select failed: %s\n", strerror(errno));
884 	exit(DFAIL_INIT);
885 }
886 
887 /*
888  * Fork into the background and detach from our parent process.
889  * In the interests of user-friendliness we wait for the daemon
890  * to complete initialisation before returning its status
891  * the the user.
892  */
893 static void be_daemon(int timeout)
894 {
895         pid_t pid;
896 	int devnull = open("/dev/null", O_RDWR);
897 	if (devnull == -1) {
898 		perror("Can't open /dev/null");
899 		exit(3);
900 	}
901 
902 	pipe(child_pipe);
903 
904 	switch (pid = fork()) {
905 	case -1:
906 		perror("clvmd: can't fork");
907 		exit(2);
908 
909 	case 0:		/* Child */
910 	        close(child_pipe[0]);
911 		break;
912 
913 	default:       /* Parent */
914 		close(child_pipe[1]);
915 		wait_for_child(child_pipe[0], timeout);
916 	}
917 
918 	/* Detach ourself from the calling environment */
919 	if (close(0) || close(1) || close(2)) {
920 		perror("Error closing terminal FDs");
921 		exit(4);
922 	}
923 	setsid();
924 
925 	if (dup2(devnull, 0) < 0 || dup2(devnull, 1) < 0
926 	    || dup2(devnull, 2) < 0) {
927 		perror("Error setting terminal FDs to /dev/null");
928 		log_error("Error setting terminal FDs to /dev/null: %m");
929 		exit(5);
930 	}
931 	if (chdir("/")) {
932 		log_error("Error setting current directory to /: %m");
933 		exit(6);
934 	}
935 
936 }
937 
938 /* Called when we have a read from the local socket.
939    was in the main loop but it's grown up and is a big girl now */
940 static int read_from_local_sock(struct local_client *thisfd)
941 {
942 	int len;
943 	int argslen;
944 	int missing_len;
945 	char buffer[PIPE_BUF];
946 
947 	len = read(thisfd->fd, buffer, sizeof(buffer));
948 	if (len == -1 && errno == EINTR)
949 		return 1;
950 
951 	DEBUGLOG("Read on local socket %d, len = %d\n", thisfd->fd, len);
952 
953 	/* EOF or error on socket */
954 	if (len <= 0) {
955 		int *status;
956 		int jstat;
957 
958 		DEBUGLOG("EOF on local socket: inprogress=%d\n",
959 			 thisfd->bits.localsock.in_progress);
960 
961 		thisfd->bits.localsock.finished = 1;
962 
963 		/* If the client went away in mid command then tidy up */
964 		if (thisfd->bits.localsock.in_progress) {
965 			pthread_kill(thisfd->bits.localsock.threadid, SIGUSR2);
966 			pthread_mutex_lock(&thisfd->bits.localsock.mutex);
967 			thisfd->bits.localsock.state = POST_COMMAND;
968 			pthread_cond_signal(&thisfd->bits.localsock.cond);
969 			pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
970 
971 			/* Free any unsent buffers */
972 			free_reply(thisfd);
973 		}
974 
975 		/* Kill the subthread & free resources */
976 		if (thisfd->bits.localsock.threadid) {
977 			DEBUGLOG("Waiting for child thread\n");
978 			pthread_mutex_lock(&thisfd->bits.localsock.mutex);
979 			thisfd->bits.localsock.state = PRE_COMMAND;
980 			pthread_cond_signal(&thisfd->bits.localsock.cond);
981 			pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
982 
983 			jstat =
984 			    pthread_join(thisfd->bits.localsock.threadid,
985 					 (void **) &status);
986 			DEBUGLOG("Joined child thread\n");
987 
988 			thisfd->bits.localsock.threadid = 0;
989 			pthread_cond_destroy(&thisfd->bits.localsock.cond);
990 			pthread_mutex_destroy(&thisfd->bits.localsock.mutex);
991 
992 			/* Remove the pipe client */
993 			if (thisfd->bits.localsock.pipe_client != NULL) {
994 				struct local_client *newfd;
995 				struct local_client *lastfd = NULL;
996 				struct local_client *free_fd = NULL;
997 
998 				close(thisfd->bits.localsock.pipe_client->fd);	/* Close pipe */
999 				close(thisfd->bits.localsock.pipe);
1000 
1001 				/* Remove pipe client */
1002 				for (newfd = &local_client_head; newfd != NULL;
1003 				     newfd = newfd->next) {
1004 					if (thisfd->bits.localsock.
1005 					    pipe_client == newfd) {
1006 						thisfd->bits.localsock.
1007 						    pipe_client = NULL;
1008 
1009 						lastfd->next = newfd->next;
1010 						free_fd = newfd;
1011 						newfd->next = lastfd;
1012 						free(free_fd);
1013 						break;
1014 					}
1015 					lastfd = newfd;
1016 				}
1017 			}
1018 		}
1019 
1020 		/* Free the command buffer */
1021 		free(thisfd->bits.localsock.cmd);
1022 
1023 		/* Clear out the cross-link */
1024 		if (thisfd->bits.localsock.pipe_client != NULL)
1025 			thisfd->bits.localsock.pipe_client->bits.pipe.client =
1026 			    NULL;
1027 
1028 		close(thisfd->fd);
1029 		return 0;
1030 	} else {
1031 		int comms_pipe[2];
1032 		struct local_client *newfd;
1033 		char csid[MAX_CSID_LEN];
1034 		struct clvm_header *inheader;
1035 		int status;
1036 
1037 		inheader = (struct clvm_header *) buffer;
1038 
1039 		/* Fill in the client ID */
1040 		inheader->clientid = htonl(thisfd->fd);
1041 
1042 		/* If we are already busy then return an error */
1043 		if (thisfd->bits.localsock.in_progress) {
1044 			struct clvm_header reply;
1045 			reply.cmd = CLVMD_CMD_REPLY;
1046 			reply.status = EBUSY;
1047 			reply.arglen = 0;
1048 			reply.flags = 0;
1049 			send_message(&reply, sizeof(reply), our_csid,
1050 				     thisfd->fd,
1051 				     "Error sending EBUSY reply to local user");
1052 			return len;
1053 		}
1054 
1055 		/* Free any old buffer space */
1056 		free(thisfd->bits.localsock.cmd);
1057 
1058 		/* See if we have the whole message */
1059 		argslen =
1060 		    len - strlen(inheader->node) - sizeof(struct clvm_header);
1061 		missing_len = inheader->arglen - argslen;
1062 
1063 		if (missing_len < 0)
1064 			missing_len = 0;
1065 
1066 		/* Save the message */
1067 		thisfd->bits.localsock.cmd = malloc(len + missing_len);
1068 
1069 		if (!thisfd->bits.localsock.cmd) {
1070 			struct clvm_header reply;
1071 			reply.cmd = CLVMD_CMD_REPLY;
1072 			reply.status = ENOMEM;
1073 			reply.arglen = 0;
1074 			reply.flags = 0;
1075 			send_message(&reply, sizeof(reply), our_csid,
1076 				     thisfd->fd,
1077 				     "Error sending ENOMEM reply to local user");
1078 			return 0;
1079 		}
1080 		memcpy(thisfd->bits.localsock.cmd, buffer, len);
1081 		thisfd->bits.localsock.cmd_len = len + missing_len;
1082 		inheader = (struct clvm_header *) thisfd->bits.localsock.cmd;
1083 
1084 		/* If we don't have the full message then read the rest now */
1085 		if (missing_len) {
1086 			char *argptr =
1087 			    inheader->node + strlen(inheader->node) + 1;
1088 
1089 			while (missing_len > 0 && len >= 0) {
1090 				DEBUGLOG
1091 				    ("got %d bytes, need another %d (total %d)\n",
1092 				     argslen, missing_len, inheader->arglen);
1093 				len = read(thisfd->fd, argptr + argslen,
1094 					   missing_len);
1095 				if (len >= 0) {
1096 					missing_len -= len;
1097 					argslen += len;
1098 				}
1099 			}
1100 		}
1101 
1102 		/* Initialise and lock the mutex so the subthread will wait after
1103 		   finishing the PRE routine */
1104 		if (!thisfd->bits.localsock.threadid) {
1105 			pthread_mutex_init(&thisfd->bits.localsock.mutex, NULL);
1106 			pthread_cond_init(&thisfd->bits.localsock.cond, NULL);
1107 			pthread_mutex_init(&thisfd->bits.localsock.reply_mutex, NULL);
1108 		}
1109 
1110 		/* Only run the command if all the cluster nodes are running CLVMD */
1111 		if (((inheader->flags & CLVMD_FLAG_LOCAL) == 0) &&
1112 		    (check_all_clvmds_running(thisfd) == -1)) {
1113 			thisfd->bits.localsock.expected_replies = 0;
1114 			thisfd->bits.localsock.num_replies = 0;
1115 			send_local_reply(thisfd, EHOSTDOWN, thisfd->fd);
1116 			return len;
1117 		}
1118 
1119 		/* Check the node name for validity */
1120 		if (inheader->node[0] && clops->csid_from_name(csid, inheader->node)) {
1121 			/* Error, node is not in the cluster */
1122 			struct clvm_header reply;
1123 			DEBUGLOG("Unknown node: '%s'\n", inheader->node);
1124 
1125 			reply.cmd = CLVMD_CMD_REPLY;
1126 			reply.status = ENOENT;
1127 			reply.flags = 0;
1128 			reply.arglen = 0;
1129 			send_message(&reply, sizeof(reply), our_csid,
1130 				     thisfd->fd,
1131 				     "Error sending ENOENT reply to local user");
1132 			thisfd->bits.localsock.expected_replies = 0;
1133 			thisfd->bits.localsock.num_replies = 0;
1134 			thisfd->bits.localsock.in_progress = FALSE;
1135 			thisfd->bits.localsock.sent_out = FALSE;
1136 			return len;
1137 		}
1138 
1139 		/* If we already have a subthread then just signal it to start */
1140 		if (thisfd->bits.localsock.threadid) {
1141 			pthread_mutex_lock(&thisfd->bits.localsock.mutex);
1142 			thisfd->bits.localsock.state = PRE_COMMAND;
1143 			pthread_cond_signal(&thisfd->bits.localsock.cond);
1144 			pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
1145 			return len;
1146 		}
1147 
1148 		/* Create a pipe and add the reading end to our FD list */
1149 		pipe(comms_pipe);
1150 		newfd = malloc(sizeof(struct local_client));
1151 		if (!newfd) {
1152 			struct clvm_header reply;
1153 			close(comms_pipe[0]);
1154 			close(comms_pipe[1]);
1155 
1156 			reply.cmd = CLVMD_CMD_REPLY;
1157 			reply.status = ENOMEM;
1158 			reply.arglen = 0;
1159 			reply.flags = 0;
1160 			send_message(&reply, sizeof(reply), our_csid,
1161 				     thisfd->fd,
1162 				     "Error sending ENOMEM reply to local user");
1163 			return len;
1164 		}
1165 		DEBUGLOG("creating pipe, [%d, %d]\n", comms_pipe[0],
1166 			 comms_pipe[1]);
1167 		newfd->fd = comms_pipe[0];
1168 		newfd->removeme = 0;
1169 		newfd->type = THREAD_PIPE;
1170 		newfd->callback = local_pipe_callback;
1171 		newfd->next = thisfd->next;
1172 		newfd->bits.pipe.client = thisfd;
1173 		newfd->bits.pipe.threadid = 0;
1174 		thisfd->next = newfd;
1175 
1176 		/* Store a cross link to the pipe */
1177 		thisfd->bits.localsock.pipe_client = newfd;
1178 
1179 		thisfd->bits.localsock.pipe = comms_pipe[1];
1180 
1181 		/* Make sure the thread has a copy of it's own ID */
1182 		newfd->bits.pipe.threadid = thisfd->bits.localsock.threadid;
1183 
1184 		/* Run the pre routine */
1185 		thisfd->bits.localsock.in_progress = TRUE;
1186 		thisfd->bits.localsock.state = PRE_COMMAND;
1187 		DEBUGLOG("Creating pre&post thread\n");
1188 		status = pthread_create(&thisfd->bits.localsock.threadid, NULL,
1189 			       pre_and_post_thread, thisfd);
1190 		DEBUGLOG("Created pre&post thread, state = %d\n", status);
1191 	}
1192 	return len;
1193 }
1194 
1195 /* Add a file descriptor from the cluster or comms interface to
1196    our list of FDs for select
1197 */
1198 int add_client(struct local_client *new_client)
1199 {
1200 	new_client->next = local_client_head.next;
1201 	local_client_head.next = new_client;
1202 
1203 	return 0;
1204 }
1205 
1206 /* Called when the pre-command has completed successfully - we
1207    now execute the real command on all the requested nodes */
1208 static int distribute_command(struct local_client *thisfd)
1209 {
1210 	struct clvm_header *inheader =
1211 	    (struct clvm_header *) thisfd->bits.localsock.cmd;
1212 	int len = thisfd->bits.localsock.cmd_len;
1213 
1214 	thisfd->xid = global_xid++;
1215 	DEBUGLOG("distribute command: XID = %d\n", thisfd->xid);
1216 
1217 	/* Forward it to other nodes in the cluster if needed */
1218 	if (!(inheader->flags & CLVMD_FLAG_LOCAL)) {
1219 		/* if node is empty then do it on the whole cluster */
1220 		if (inheader->node[0] == '\0') {
1221 			thisfd->bits.localsock.expected_replies =
1222 			    clops->get_num_nodes();
1223 			thisfd->bits.localsock.num_replies = 0;
1224 			thisfd->bits.localsock.sent_time = time(NULL);
1225 			thisfd->bits.localsock.in_progress = TRUE;
1226 			thisfd->bits.localsock.sent_out = TRUE;
1227 
1228 			/* Do it here first */
1229 			add_to_lvmqueue(thisfd, inheader, len, NULL);
1230 
1231 			DEBUGLOG("Sending message to all cluster nodes\n");
1232 			inheader->xid = thisfd->xid;
1233 			send_message(inheader, len, NULL, -1,
1234 				     "Error forwarding message to cluster");
1235 		} else {
1236                         /* Do it on a single node */
1237 			char csid[MAX_CSID_LEN];
1238 
1239 			if (clops->csid_from_name(csid, inheader->node)) {
1240 				/* This has already been checked so should not happen */
1241 				return 0;
1242 			} else {
1243 			        /* OK, found a node... */
1244 				thisfd->bits.localsock.expected_replies = 1;
1245 				thisfd->bits.localsock.num_replies = 0;
1246 				thisfd->bits.localsock.in_progress = TRUE;
1247 
1248 				/* Are we the requested node ?? */
1249 				if (memcmp(csid, our_csid, max_csid_len) == 0) {
1250 					DEBUGLOG("Doing command on local node only\n");
1251 					add_to_lvmqueue(thisfd, inheader, len, NULL);
1252 				} else {
1253 					DEBUGLOG("Sending message to single node: %s\n",
1254 						 inheader->node);
1255 					inheader->xid = thisfd->xid;
1256 					send_message(inheader, len,
1257 						     csid, -1,
1258 						     "Error forwarding message to cluster node");
1259 				}
1260 			}
1261 		}
1262 	} else {
1263 		/* Local explicitly requested, ignore nodes */
1264 		thisfd->bits.localsock.in_progress = TRUE;
1265 		thisfd->bits.localsock.expected_replies = 1;
1266 		thisfd->bits.localsock.num_replies = 0;
1267 		add_to_lvmqueue(thisfd, inheader, len, NULL);
1268 	}
1269 	return 0;
1270 }
1271 
1272 /* Process a command from a remote node and return the result */
1273 static void process_remote_command(struct clvm_header *msg, int msglen, int fd,
1274 			    	   const char *csid)
1275 {
1276 	char *replyargs;
1277 	char nodename[max_cluster_member_name_len];
1278 	int replylen = 0;
1279 	int buflen = max_cluster_message - sizeof(struct clvm_header) - 1;
1280 	int status;
1281 	int msg_malloced = 0;
1282 
1283 	/* Get the node name as we /may/ need it later */
1284 	clops->name_from_csid(csid, nodename);
1285 
1286 	DEBUGLOG("process_remote_command %s for clientid 0x%x XID %d on node %s\n",
1287 		 decode_cmd(msg->cmd), msg->clientid, msg->xid, nodename);
1288 
1289 	/* Check for GOAWAY and sulk */
1290 	if (msg->cmd == CLVMD_CMD_GOAWAY) {
1291 
1292 		DEBUGLOG("Told to go away by %s\n", nodename);
1293 		log_error("Told to go away by %s\n", nodename);
1294 		exit(99);
1295 	}
1296 
1297 	/* Version check is internal - don't bother exposing it in
1298 	   clvmd-command.c */
1299 	if (msg->cmd == CLVMD_CMD_VERSION) {
1300 		int version_nums[3];
1301 		char node[256];
1302 
1303 		memcpy(version_nums, msg->args, sizeof(version_nums));
1304 
1305 		clops->name_from_csid(csid, node);
1306 		DEBUGLOG("Remote node %s is version %d.%d.%d\n",
1307 			 node,
1308 			 ntohl(version_nums[0]),
1309 			 ntohl(version_nums[1]), ntohl(version_nums[2]));
1310 
1311 		if (ntohl(version_nums[0]) != CLVMD_MAJOR_VERSION) {
1312 			struct clvm_header byebyemsg;
1313 			DEBUGLOG
1314 			    ("Telling node %s to go away because of incompatible version number\n",
1315 			     node);
1316 			log_notice
1317 			    ("Telling node %s to go away because of incompatible version number %d.%d.%d\n",
1318 			     node, ntohl(version_nums[0]),
1319 			     ntohl(version_nums[1]), ntohl(version_nums[2]));
1320 
1321 			byebyemsg.cmd = CLVMD_CMD_GOAWAY;
1322 			byebyemsg.status = 0;
1323 			byebyemsg.flags = 0;
1324 			byebyemsg.arglen = 0;
1325 			byebyemsg.clientid = 0;
1326 			clops->cluster_send_message(&byebyemsg, sizeof(byebyemsg),
1327 					     our_csid,
1328 					     "Error Sending GOAWAY message");
1329 		} else {
1330 			clops->add_up_node(csid);
1331 		}
1332 		return;
1333 	}
1334 
1335 	/* Allocate a default reply buffer */
1336 	replyargs = malloc(max_cluster_message - sizeof(struct clvm_header));
1337 
1338 	if (replyargs != NULL) {
1339 		/* Run the command */
1340 		status =
1341 		    do_command(NULL, msg, msglen, &replyargs, buflen,
1342 			       &replylen);
1343 	} else {
1344 		status = ENOMEM;
1345 	}
1346 
1347 	/* If it wasn't a reply, then reply */
1348 	if (msg->cmd != CLVMD_CMD_REPLY) {
1349 		char *aggreply;
1350 
1351 		aggreply =
1352 		    realloc(replyargs, replylen + sizeof(struct clvm_header));
1353 		if (aggreply) {
1354 			struct clvm_header *agghead =
1355 			    (struct clvm_header *) aggreply;
1356 
1357 			replyargs = aggreply;
1358 			/* Move it up so there's room for a header in front of the data */
1359 			memmove(aggreply + offsetof(struct clvm_header, args),
1360 				replyargs, replylen);
1361 
1362 			agghead->xid = msg->xid;
1363 			agghead->cmd = CLVMD_CMD_REPLY;
1364 			agghead->status = status;
1365 			agghead->flags = 0;
1366 			agghead->clientid = msg->clientid;
1367 			agghead->arglen = replylen;
1368 			agghead->node[0] = '\0';
1369 			send_message(aggreply,
1370 				     sizeof(struct clvm_header) +
1371 				     replylen, csid, fd,
1372 				     "Error sending command reply");
1373 		} else {
1374 			struct clvm_header head;
1375 
1376 			DEBUGLOG("Error attempting to realloc return buffer\n");
1377 			/* Return a failure response */
1378 			head.cmd = CLVMD_CMD_REPLY;
1379 			head.status = ENOMEM;
1380 			head.flags = 0;
1381 			head.clientid = msg->clientid;
1382 			head.arglen = 0;
1383 			head.node[0] = '\0';
1384 			send_message(&head, sizeof(struct clvm_header), csid,
1385 				     fd, "Error sending ENOMEM command reply");
1386 			return;
1387 		}
1388 	}
1389 
1390 	/* Free buffer if it was malloced */
1391 	if (msg_malloced) {
1392 		free(msg);
1393 	}
1394 	free(replyargs);
1395 }
1396 
1397 /* Add a reply to a command to the list of replies for this client.
1398    If we have got a full set then send them to the waiting client down the local
1399    socket */
1400 static void add_reply_to_list(struct local_client *client, int status,
1401 			      const char *csid, const char *buf, int len)
1402 {
1403 	struct node_reply *reply;
1404 
1405 	pthread_mutex_lock(&client->bits.localsock.reply_mutex);
1406 
1407 	/* Add it to the list of replies */
1408 	reply = malloc(sizeof(struct node_reply));
1409 	if (reply) {
1410 		reply->status = status;
1411 		clops->name_from_csid(csid, reply->node);
1412 		DEBUGLOG("Reply from node %s: %d bytes\n", reply->node, len);
1413 
1414 		if (len > 0) {
1415 			reply->replymsg = malloc(len);
1416 			if (!reply->replymsg) {
1417 				reply->status = ENOMEM;
1418 			} else {
1419 				memcpy(reply->replymsg, buf, len);
1420 			}
1421 		} else {
1422 			reply->replymsg = NULL;
1423 		}
1424 		/* Hook it onto the reply chain */
1425 		reply->next = client->bits.localsock.replies;
1426 		client->bits.localsock.replies = reply;
1427 	} else {
1428 		/* It's all gone horribly wrong... */
1429 		pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
1430 		send_local_reply(client, ENOMEM, client->fd);
1431 		return;
1432 	}
1433 	DEBUGLOG("Got %d replies, expecting: %d\n",
1434 		 client->bits.localsock.num_replies + 1,
1435 		 client->bits.localsock.expected_replies);
1436 
1437 	/* If we have the whole lot then do the post-process */
1438 	if (++client->bits.localsock.num_replies ==
1439 	    client->bits.localsock.expected_replies) {
1440 		/* Post-process the command */
1441 		if (client->bits.localsock.threadid) {
1442 			pthread_mutex_lock(&client->bits.localsock.mutex);
1443 			client->bits.localsock.state = POST_COMMAND;
1444 			pthread_cond_signal(&client->bits.localsock.cond);
1445 			pthread_mutex_unlock(&client->bits.localsock.mutex);
1446 		}
1447 	}
1448 	pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
1449 }
1450 
1451 /* This is the thread that runs the PRE and post commands for a particular connection */
1452 static __attribute__ ((noreturn)) void *pre_and_post_thread(void *arg)
1453 {
1454 	struct local_client *client = (struct local_client *) arg;
1455 	int status;
1456 	int write_status;
1457 	sigset_t ss;
1458 	int pipe_fd = client->bits.localsock.pipe;
1459 
1460 	DEBUGLOG("in sub thread: client = %p\n", client);
1461 
1462 	/* Don't start until the LVM thread is ready */
1463 	pthread_mutex_lock(&lvm_start_mutex);
1464 	pthread_mutex_unlock(&lvm_start_mutex);
1465 	DEBUGLOG("Sub thread ready for work.\n");
1466 
1467 	/* Ignore SIGUSR1 (handled by master process) but enable
1468 	   SIGUSR2 (kills subthreads) */
1469 	sigemptyset(&ss);
1470 	sigaddset(&ss, SIGUSR1);
1471 	pthread_sigmask(SIG_BLOCK, &ss, NULL);
1472 
1473 	sigdelset(&ss, SIGUSR1);
1474 	sigaddset(&ss, SIGUSR2);
1475 	pthread_sigmask(SIG_UNBLOCK, &ss, NULL);
1476 
1477 	/* Loop around doing PRE and POST functions until the client goes away */
1478 	while (!client->bits.localsock.finished) {
1479 		/* Execute the code */
1480 		status = do_pre_command(client);
1481 
1482 		if (status)
1483 			client->bits.localsock.all_success = 0;
1484 
1485 		DEBUGLOG("Writing status %d down pipe %d\n", status, pipe_fd);
1486 
1487 		/* Tell the parent process we have finished this bit */
1488 		do {
1489 			write_status = write(pipe_fd, &status, sizeof(int));
1490 			if (write_status == sizeof(int))
1491 				break;
1492 			if (write_status < 0 &&
1493 			    (errno == EINTR || errno == EAGAIN))
1494 				continue;
1495 			log_error("Error sending to pipe: %m\n");
1496 			break;
1497 		} while(1);
1498 
1499 		if (status) {
1500 			client->bits.localsock.state = POST_COMMAND;
1501 			goto next_pre;
1502 		}
1503 
1504 		/* We may need to wait for the condition variable before running the post command */
1505 		pthread_mutex_lock(&client->bits.localsock.mutex);
1506 		DEBUGLOG("Waiting to do post command - state = %d\n",
1507 			 client->bits.localsock.state);
1508 
1509 		if (client->bits.localsock.state != POST_COMMAND) {
1510 			pthread_cond_wait(&client->bits.localsock.cond,
1511 					  &client->bits.localsock.mutex);
1512 		}
1513 		pthread_mutex_unlock(&client->bits.localsock.mutex);
1514 
1515 		DEBUGLOG("Got post command condition...\n");
1516 
1517 		/* POST function must always run, even if the client aborts */
1518 		status = 0;
1519 		do_post_command(client);
1520 
1521 		do {
1522 			write_status = write(pipe_fd, &status, sizeof(int));
1523 			if (write_status == sizeof(int))
1524 				break;
1525 			if (write_status < 0 &&
1526 			    (errno == EINTR || errno == EAGAIN))
1527 				continue;
1528 			log_error("Error sending to pipe: %m\n");
1529 			break;
1530 		} while(1);
1531 next_pre:
1532 		DEBUGLOG("Waiting for next pre command\n");
1533 
1534 		pthread_mutex_lock(&client->bits.localsock.mutex);
1535 		if (client->bits.localsock.state != PRE_COMMAND &&
1536 		    !client->bits.localsock.finished) {
1537 			pthread_cond_wait(&client->bits.localsock.cond,
1538 					  &client->bits.localsock.mutex);
1539 		}
1540 		pthread_mutex_unlock(&client->bits.localsock.mutex);
1541 
1542 		DEBUGLOG("Got pre command condition...\n");
1543 	}
1544 	DEBUGLOG("Subthread finished\n");
1545 	pthread_exit((void *) 0);
1546 }
1547 
1548 /* Process a command on the local node and store the result */
1549 static int process_local_command(struct clvm_header *msg, int msglen,
1550 				 struct local_client *client,
1551 				 unsigned short xid)
1552 {
1553 	char *replybuf = malloc(max_cluster_message);
1554 	int buflen = max_cluster_message - sizeof(struct clvm_header) - 1;
1555 	int replylen = 0;
1556 	int status;
1557 
1558 	DEBUGLOG("process_local_command: %s msg=%p, msglen =%d, client=%p\n",
1559 		 decode_cmd(msg->cmd), msg, msglen, client);
1560 
1561 	if (replybuf == NULL)
1562 		return -1;
1563 
1564 	status = do_command(client, msg, msglen, &replybuf, buflen, &replylen);
1565 
1566 	if (status)
1567 		client->bits.localsock.all_success = 0;
1568 
1569 	/* If we took too long then discard the reply */
1570 	if (xid == client->xid) {
1571 		add_reply_to_list(client, status, our_csid, replybuf, replylen);
1572 	} else {
1573 		DEBUGLOG
1574 		    ("Local command took too long, discarding xid %d, current is %d\n",
1575 		     xid, client->xid);
1576 	}
1577 
1578 	free(replybuf);
1579 	return status;
1580 }
1581 
1582 static int process_reply(const struct clvm_header *msg, int msglen, const char *csid)
1583 {
1584 	struct local_client *client = NULL;
1585 
1586 	client = find_client(msg->clientid);
1587 	if (!client) {
1588 		DEBUGLOG("Got message for unknown client 0x%x\n",
1589 			 msg->clientid);
1590 		log_error("Got message for unknown client 0x%x\n",
1591 			  msg->clientid);
1592 		return -1;
1593 	}
1594 
1595 	if (msg->status)
1596 		client->bits.localsock.all_success = 0;
1597 
1598 	/* Gather replies together for this client id */
1599 	if (msg->xid == client->xid) {
1600 		add_reply_to_list(client, msg->status, csid, msg->args,
1601 				  msg->arglen);
1602 	} else {
1603 		DEBUGLOG("Discarding reply with old XID %d, current = %d\n",
1604 			 msg->xid, client->xid);
1605 	}
1606 	return 0;
1607 }
1608 
1609 /* Send an aggregated reply back to the client */
1610 static void send_local_reply(struct local_client *client, int status, int fd)
1611 {
1612 	struct clvm_header *clientreply;
1613 	struct node_reply *thisreply = client->bits.localsock.replies;
1614 	char *replybuf;
1615 	char *ptr;
1616 	int message_len = 0;
1617 
1618 	DEBUGLOG("Send local reply\n");
1619 
1620 	/* Work out the total size of the reply */
1621 	while (thisreply) {
1622 		if (thisreply->replymsg)
1623 			message_len += strlen(thisreply->replymsg) + 1;
1624 		else
1625 			message_len++;
1626 
1627 		message_len += strlen(thisreply->node) + 1 + sizeof(int);
1628 
1629 		thisreply = thisreply->next;
1630 	}
1631 
1632 	/* Add in the size of our header */
1633 	message_len = message_len + sizeof(struct clvm_header) + 1;
1634 	replybuf = malloc(message_len);
1635 
1636 	clientreply = (struct clvm_header *) replybuf;
1637 	clientreply->status = status;
1638 	clientreply->cmd = CLVMD_CMD_REPLY;
1639 	clientreply->node[0] = '\0';
1640 	clientreply->flags = 0;
1641 
1642 	ptr = clientreply->args;
1643 
1644 	/* Add in all the replies, and free them as we go */
1645 	thisreply = client->bits.localsock.replies;
1646 	while (thisreply) {
1647 		struct node_reply *tempreply = thisreply;
1648 
1649 		strcpy(ptr, thisreply->node);
1650 		ptr += strlen(thisreply->node) + 1;
1651 
1652 		if (thisreply->status)
1653 			clientreply->flags |= CLVMD_FLAG_NODEERRS;
1654 
1655 		memcpy(ptr, &thisreply->status, sizeof(int));
1656 		ptr += sizeof(int);
1657 
1658 		if (thisreply->replymsg) {
1659 			strcpy(ptr, thisreply->replymsg);
1660 			ptr += strlen(thisreply->replymsg) + 1;
1661 		} else {
1662 			ptr[0] = '\0';
1663 			ptr++;
1664 		}
1665 		thisreply = thisreply->next;
1666 
1667 		free(tempreply->replymsg);
1668 		free(tempreply);
1669 	}
1670 
1671 	/* Terminate with an empty node name */
1672 	*ptr = '\0';
1673 
1674 	clientreply->arglen = ptr - clientreply->args + 1;
1675 
1676 	/* And send it */
1677 	send_message(replybuf, message_len, our_csid, fd,
1678 		     "Error sending REPLY to client");
1679 	free(replybuf);
1680 
1681 	/* Reset comms variables */
1682 	client->bits.localsock.replies = NULL;
1683 	client->bits.localsock.expected_replies = 0;
1684 	client->bits.localsock.in_progress = FALSE;
1685 	client->bits.localsock.sent_out = FALSE;
1686 }
1687 
1688 /* Just free a reply chain baceuse it wasn't used. */
1689 static void free_reply(struct local_client *client)
1690 {
1691 	/* Add in all the replies, and free them as we go */
1692 	struct node_reply *thisreply = client->bits.localsock.replies;
1693 	while (thisreply) {
1694 		struct node_reply *tempreply = thisreply;
1695 
1696 		thisreply = thisreply->next;
1697 
1698 		free(tempreply->replymsg);
1699 		free(tempreply);
1700 	}
1701 	client->bits.localsock.replies = NULL;
1702 }
1703 
1704 /* Send our version number to the cluster */
1705 static void send_version_message()
1706 {
1707 	char message[sizeof(struct clvm_header) + sizeof(int) * 3];
1708 	struct clvm_header *msg = (struct clvm_header *) message;
1709 	int version_nums[3];
1710 
1711 	msg->cmd = CLVMD_CMD_VERSION;
1712 	msg->status = 0;
1713 	msg->flags = 0;
1714 	msg->clientid = 0;
1715 	msg->arglen = sizeof(version_nums);
1716 
1717 	version_nums[0] = htonl(CLVMD_MAJOR_VERSION);
1718 	version_nums[1] = htonl(CLVMD_MINOR_VERSION);
1719 	version_nums[2] = htonl(CLVMD_PATCH_VERSION);
1720 
1721 	memcpy(&msg->args, version_nums, sizeof(version_nums));
1722 
1723 	hton_clvm(msg);
1724 
1725 	clops->cluster_send_message(message, sizeof(message), NULL,
1726 			     "Error Sending version number");
1727 }
1728 
1729 /* Send a message to either a local client or another server */
1730 static int send_message(void *buf, int msglen, const char *csid, int fd,
1731 			const char *errtext)
1732 {
1733 	int len = 0;
1734 	int saved_errno = 0;
1735 	struct timespec delay;
1736 	struct timespec remtime;
1737 
1738 	int retry_cnt = 0;
1739 
1740 	/* Send remote messages down the cluster socket */
1741 	if (csid == NULL || !ISLOCAL_CSID(csid)) {
1742 		hton_clvm((struct clvm_header *) buf);
1743 		return clops->cluster_send_message(buf, msglen, csid, errtext);
1744 	} else {
1745 		int ptr = 0;
1746 
1747 		/* Make sure it all goes */
1748 		do {
1749 			if (retry_cnt > MAX_RETRIES)
1750 			{
1751 				errno = saved_errno;
1752 				log_error("%s", errtext);
1753 				errno = saved_errno;
1754 				break;
1755 			}
1756 
1757 			len = write(fd, buf + ptr, msglen - ptr);
1758 
1759 			if (len <= 0) {
1760 				if (errno == EINTR)
1761 					continue;
1762 				if (errno == EAGAIN ||
1763 				    errno == EIO ||
1764 				    errno == ENOSPC) {
1765 					saved_errno = errno;
1766 					retry_cnt++;
1767 
1768 					delay.tv_sec = 0;
1769 					delay.tv_nsec = 100000;
1770 					remtime.tv_sec = 0;
1771 					remtime.tv_nsec = 0;
1772 					(void) nanosleep (&delay, &remtime);
1773 
1774 					continue;
1775 				}
1776 				log_error("%s", errtext);
1777 				break;
1778 			}
1779 			ptr += len;
1780 		} while (ptr < msglen);
1781 	}
1782 	return len;
1783 }
1784 
1785 static int process_work_item(struct lvm_thread_cmd *cmd)
1786 {
1787 	/* If msg is NULL then this is a cleanup request */
1788 	if (cmd->msg == NULL) {
1789 		DEBUGLOG("process_work_item: free fd %d\n", cmd->client->fd);
1790 		cmd_client_cleanup(cmd->client);
1791 		free(cmd->client);
1792 		return 0;
1793 	}
1794 
1795 	if (!cmd->remote) {
1796 		DEBUGLOG("process_work_item: local\n");
1797 		process_local_command(cmd->msg, cmd->msglen, cmd->client,
1798 				      cmd->xid);
1799 	} else {
1800 		DEBUGLOG("process_work_item: remote\n");
1801 		process_remote_command(cmd->msg, cmd->msglen, cmd->client->fd,
1802 				       cmd->csid);
1803 	}
1804 	return 0;
1805 }
1806 
1807 /*
1808  * Routine that runs in the "LVM thread".
1809  */
1810 static void lvm_thread_fn(void *arg)
1811 {
1812 	struct dm_list *cmdl, *tmp;
1813 	sigset_t ss;
1814 	int using_gulm = (int)(long)arg;
1815 
1816 	DEBUGLOG("LVM thread function started\n");
1817 
1818 	/* Ignore SIGUSR1 & 2 */
1819 	sigemptyset(&ss);
1820 	sigaddset(&ss, SIGUSR1);
1821 	sigaddset(&ss, SIGUSR2);
1822 	pthread_sigmask(SIG_BLOCK, &ss, NULL);
1823 
1824 	/* Initialise the interface to liblvm */
1825 	init_lvm(using_gulm);
1826 
1827 	/* Allow others to get moving */
1828 	pthread_mutex_unlock(&lvm_start_mutex);
1829 
1830 	/* Now wait for some actual work */
1831 	for (;;) {
1832 		DEBUGLOG("LVM thread waiting for work\n");
1833 
1834 		pthread_mutex_lock(&lvm_thread_mutex);
1835 		if (dm_list_empty(&lvm_cmd_head))
1836 			pthread_cond_wait(&lvm_thread_cond, &lvm_thread_mutex);
1837 
1838 		dm_list_iterate_safe(cmdl, tmp, &lvm_cmd_head) {
1839 			struct lvm_thread_cmd *cmd;
1840 
1841 			cmd =
1842 			    dm_list_struct_base(cmdl, struct lvm_thread_cmd, list);
1843 			dm_list_del(&cmd->list);
1844 			pthread_mutex_unlock(&lvm_thread_mutex);
1845 
1846 			process_work_item(cmd);
1847 			free(cmd->msg);
1848 			free(cmd);
1849 
1850 			pthread_mutex_lock(&lvm_thread_mutex);
1851 		}
1852 		pthread_mutex_unlock(&lvm_thread_mutex);
1853 	}
1854 }
1855 
1856 /* Pass down some work to the LVM thread */
1857 static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg,
1858 			   int msglen, const char *csid)
1859 {
1860 	struct lvm_thread_cmd *cmd;
1861 
1862 	cmd = malloc(sizeof(struct lvm_thread_cmd));
1863 	if (!cmd)
1864 		return ENOMEM;
1865 
1866 	if (msglen) {
1867 		cmd->msg = malloc(msglen);
1868 		if (!cmd->msg) {
1869 			log_error("Unable to allocate buffer space\n");
1870 			free(cmd);
1871 			return -1;
1872 		}
1873 		memcpy(cmd->msg, msg, msglen);
1874 	}
1875 	else {
1876 		cmd->msg = NULL;
1877 	}
1878 	cmd->client = client;
1879 	cmd->msglen = msglen;
1880 	cmd->xid = client->xid;
1881 
1882 	if (csid) {
1883 		memcpy(cmd->csid, csid, max_csid_len);
1884 		cmd->remote = 1;
1885 	} else {
1886 		cmd->remote = 0;
1887 	}
1888 
1889 	DEBUGLOG
1890 	    ("add_to_lvmqueue: cmd=%p. client=%p, msg=%p, len=%d, csid=%p, xid=%d\n",
1891 	     cmd, client, msg, msglen, csid, cmd->xid);
1892 	pthread_mutex_lock(&lvm_thread_mutex);
1893 	dm_list_add(&lvm_cmd_head, &cmd->list);
1894 	pthread_cond_signal(&lvm_thread_cond);
1895 	pthread_mutex_unlock(&lvm_thread_mutex);
1896 
1897 	return 0;
1898 }
1899 
1900 /* Return 0 if we can talk to an existing clvmd */
1901 static int check_local_clvmd(void)
1902 {
1903 	int local_socket;
1904 	struct sockaddr_un sockaddr;
1905 	int ret = 0;
1906 
1907 	/* Open local socket */
1908 	if ((local_socket = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
1909 		return -1;
1910 	}
1911 
1912 	memset(&sockaddr, 0, sizeof(sockaddr));
1913 	memcpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(CLVMD_SOCKNAME));
1914 	sockaddr.sun_family = AF_UNIX;
1915 
1916 	if (connect(local_socket,(struct sockaddr *) &sockaddr,
1917 		    sizeof(sockaddr))) {
1918 		ret = -1;
1919 	}
1920 
1921 	close(local_socket);
1922 	return ret;
1923 }
1924 
1925 
1926 /* Open the local socket, that's the one we talk to libclvm down */
1927 static int open_local_sock()
1928 {
1929 	int local_socket;
1930 	struct sockaddr_un sockaddr;
1931 
1932 	/* Open local socket */
1933 	if (CLVMD_SOCKNAME[0] != '\0')
1934 		unlink(CLVMD_SOCKNAME);
1935 	local_socket = socket(PF_UNIX, SOCK_STREAM, 0);
1936 	if (local_socket < 0) {
1937 		log_error("Can't create local socket: %m");
1938 		return -1;
1939 	}
1940 	/* Set Close-on-exec & non-blocking */
1941 	fcntl(local_socket, F_SETFD, 1);
1942 	fcntl(local_socket, F_SETFL, fcntl(local_socket, F_GETFL, 0) | O_NONBLOCK);
1943 
1944 	memset(&sockaddr, 0, sizeof(sockaddr));
1945 	memcpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(CLVMD_SOCKNAME));
1946 	sockaddr.sun_family = AF_UNIX;
1947 	if (bind(local_socket, (struct sockaddr *) &sockaddr, sizeof(sockaddr))) {
1948 		log_error("can't bind local socket: %m");
1949 		close(local_socket);
1950 		return -1;
1951 	}
1952 	if (listen(local_socket, 1) != 0) {
1953 		log_error("listen local: %m");
1954 		close(local_socket);
1955 		return -1;
1956 	}
1957 	if (CLVMD_SOCKNAME[0] != '\0')
1958 		chmod(CLVMD_SOCKNAME, 0600);
1959 
1960 	return local_socket;
1961 }
1962 
1963 void process_message(struct local_client *client, const char *buf, int len,
1964 		     const char *csid)
1965 {
1966 	struct clvm_header *inheader;
1967 
1968 	inheader = (struct clvm_header *) buf;
1969 	ntoh_clvm(inheader);	/* Byteswap fields */
1970 	if (inheader->cmd == CLVMD_CMD_REPLY)
1971 		process_reply(inheader, len, csid);
1972 	else
1973 		add_to_lvmqueue(client, inheader, len, csid);
1974 }
1975 
1976 
1977 static void check_all_callback(struct local_client *client, const char *csid,
1978 			       int node_up)
1979 {
1980 	if (!node_up)
1981 		add_reply_to_list(client, EHOSTDOWN, csid, "CLVMD not running",
1982 				  18);
1983 }
1984 
1985 /* Check to see if all CLVMDs are running (ie one on
1986    every node in the cluster).
1987    If not, returns -1 and prints out a list of errant nodes */
1988 static int check_all_clvmds_running(struct local_client *client)
1989 {
1990 	DEBUGLOG("check_all_clvmds_running\n");
1991 	return clops->cluster_do_node_callback(client, check_all_callback);
1992 }
1993 
1994 /* Return a local_client struct given a client ID.
1995    client IDs are in network byte order */
1996 static struct local_client *find_client(int clientid)
1997 {
1998 	struct local_client *thisfd;
1999 	for (thisfd = &local_client_head; thisfd != NULL; thisfd = thisfd->next) {
2000 		if (thisfd->fd == ntohl(clientid))
2001 			return thisfd;
2002 	}
2003 	return NULL;
2004 }
2005 
2006 /* Byte-swapping routines for the header so we
2007    work in a heterogeneous environment */
2008 static void hton_clvm(struct clvm_header *hdr)
2009 {
2010 	hdr->status = htonl(hdr->status);
2011 	hdr->arglen = htonl(hdr->arglen);
2012 	hdr->xid = htons(hdr->xid);
2013 	/* Don't swap clientid as it's only a token as far as
2014 	   remote nodes are concerned */
2015 }
2016 
2017 static void ntoh_clvm(struct clvm_header *hdr)
2018 {
2019 	hdr->status = ntohl(hdr->status);
2020 	hdr->arglen = ntohl(hdr->arglen);
2021 	hdr->xid = ntohs(hdr->xid);
2022 }
2023 
2024 /* Handler for SIGUSR2 - sent to kill subthreads */
2025 static void sigusr2_handler(int sig)
2026 {
2027 	DEBUGLOG("SIGUSR2 received\n");
2028 	return;
2029 }
2030 
2031 static void sigterm_handler(int sig)
2032 {
2033 	DEBUGLOG("SIGTERM received\n");
2034 	quit = 1;
2035 	return;
2036 }
2037 
2038 static void sighup_handler(int sig)
2039 {
2040         DEBUGLOG("got SIGHUP\n");
2041 	reread_config = 1;
2042 }
2043 
2044 int sync_lock(const char *resource, int mode, int flags, int *lockid)
2045 {
2046 	return clops->sync_lock(resource, mode, flags, lockid);
2047 }
2048 
2049 int sync_unlock(const char *resource, int lockid)
2050 {
2051 	return clops->sync_unlock(resource, lockid);
2052 }
2053 
2054 static if_type_t parse_cluster_interface(char *ifname)
2055 {
2056 	if_type_t iface = IF_AUTO;
2057 
2058 	if (!strcmp(ifname, "auto"))
2059 		iface = IF_AUTO;
2060 	if (!strcmp(ifname, "cman"))
2061 		iface = IF_CMAN;
2062 	if (!strcmp(ifname, "gulm"))
2063 		iface = IF_GULM;
2064 	if (!strcmp(ifname, "openais"))
2065 		iface = IF_OPENAIS;
2066 	if (!strcmp(ifname, "corosync"))
2067 		iface = IF_COROSYNC;
2068 
2069 	return iface;
2070 }
2071 
2072 /*
2073  * Try and find a cluster system in corosync's objdb, if it is running. This is
2074  * only called if the command-line option is not present, and if it fails
2075  * we still try the interfaces in order.
2076  */
2077 static if_type_t get_cluster_type()
2078 {
2079 #ifdef HAVE_COROSYNC_CONFDB_H
2080 	confdb_handle_t handle;
2081 	if_type_t type = IF_AUTO;
2082 	int result;
2083 	char buf[255];
2084 	size_t namelen = sizeof(buf);
2085 	hdb_handle_t cluster_handle;
2086 	hdb_handle_t clvmd_handle;
2087 	confdb_callbacks_t callbacks = {
2088 		.confdb_key_change_notify_fn = NULL,
2089 		.confdb_object_create_change_notify_fn = NULL,
2090 		.confdb_object_delete_change_notify_fn = NULL
2091 	};
2092 
2093 	result = confdb_initialize (&handle, &callbacks);
2094         if (result != CS_OK)
2095 		return type;
2096 
2097         result = confdb_object_find_start(handle, OBJECT_PARENT_HANDLE);
2098 	if (result != CS_OK)
2099 		goto out;
2100 
2101         result = confdb_object_find(handle, OBJECT_PARENT_HANDLE, (void *)"cluster", strlen("cluster"), &cluster_handle);
2102         if (result != CS_OK)
2103 		goto out;
2104 
2105         result = confdb_object_find_start(handle, cluster_handle);
2106 	if (result != CS_OK)
2107 		goto out;
2108 
2109         result = confdb_object_find(handle, cluster_handle, (void *)"clvmd", strlen("clvmd"), &clvmd_handle);
2110         if (result != CS_OK)
2111 		goto out;
2112 
2113         result = confdb_key_get(handle, clvmd_handle, (void *)"interface", strlen("interface"), buf, &namelen);
2114         if (result != CS_OK)
2115 		goto out;
2116 
2117 	buf[namelen] = '\0';
2118 	type = parse_cluster_interface(buf);
2119 	DEBUGLOG("got interface type '%s' from confdb\n", buf);
2120 out:
2121 	confdb_finalize(handle);
2122 	return type;
2123 #else
2124 	return IF_AUTO;
2125 #endif
2126 }
2127