1 /*	$NetBSD: clvmd.c,v 1.2 2015/11/09 00:53:57 dholland Exp $	*/
2 
3 /*
4  * Copyright (C) 2002-2004 Sistina Software, Inc. All rights reserved.
5  * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
6  *
7  * This file is part of LVM2.
8  *
9  * This copyrighted material is made available to anyone wishing to use,
10  * modify, copy, or redistribute it subject to the terms and conditions
11  * of the GNU General Public License v.2.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this program; if not, write to the Free Software Foundation,
15  * Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16  */
17 
18 /*
19  * CLVMD: Cluster LVM daemon
20  */
21 
22 #define _GNU_SOURCE
23 #define _FILE_OFFSET_BITS 64
24 
25 #include <configure.h>
26 #include <libdevmapper.h>
27 
28 #include <pthread.h>
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <sys/socket.h>
32 #include <sys/uio.h>
33 #include <sys/un.h>
34 #include <sys/time.h>
35 #include <sys/ioctl.h>
36 #include <sys/utsname.h>
37 #include <netinet/in.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <stddef.h>
41 #include <stdarg.h>
42 #include <signal.h>
43 #include <unistd.h>
44 #include <fcntl.h>
45 #include <getopt.h>
46 #include <syslog.h>
47 #include <errno.h>
48 #include <limits.h>
49 #ifdef HAVE_COROSYNC_CONFDB_H
50 #include <corosync/confdb.h>
51 #endif
52 
53 #include "clvmd-comms.h"
54 #include "lvm-functions.h"
55 #include "clvm.h"
56 #include "lvm-version.h"
57 #include "clvmd.h"
58 #include "refresh_clvmd.h"
59 #include "lvm-logging.h"
60 
61 #ifndef TRUE
62 #define TRUE 1
63 #endif
64 #ifndef FALSE
65 #define FALSE 0
66 #endif
67 
68 #define MAX_RETRIES 4
69 
70 #define ISLOCAL_CSID(c) (memcmp(c, our_csid, max_csid_len) == 0)
71 
72 /* Head of the fd list. Also contains
73    the cluster_socket details */
74 static struct local_client local_client_head;
75 
76 static unsigned short global_xid = 0;	/* Last transaction ID issued */
77 
78 struct cluster_ops *clops = NULL;
79 
80 static char our_csid[MAX_CSID_LEN];
81 static unsigned max_csid_len;
82 static unsigned max_cluster_message;
83 static unsigned max_cluster_member_name_len;
84 
85 /* Structure of items on the LVM thread list */
86 struct lvm_thread_cmd {
87 	struct dm_list list;
88 
89 	struct local_client *client;
90 	struct clvm_header *msg;
91 	char csid[MAX_CSID_LEN];
92 	int remote;		/* Flag */
93 	int msglen;
94 	unsigned short xid;
95 };
96 
97 debug_t debug;
98 static pthread_t lvm_thread;
99 static pthread_mutex_t lvm_thread_mutex;
100 static pthread_cond_t lvm_thread_cond;
101 static pthread_mutex_t lvm_start_mutex;
102 static struct dm_list lvm_cmd_head;
103 static volatile sig_atomic_t quit = 0;
104 static volatile sig_atomic_t reread_config = 0;
105 static int child_pipe[2];
106 
107 /* Reasons the daemon failed initialisation */
108 #define DFAIL_INIT       1
109 #define DFAIL_LOCAL_SOCK 2
110 #define DFAIL_CLUSTER_IF 3
111 #define DFAIL_MALLOC     4
112 #define DFAIL_TIMEOUT    5
113 #define SUCCESS          0
114 
115 typedef enum {IF_AUTO, IF_CMAN, IF_GULM, IF_OPENAIS, IF_COROSYNC} if_type_t;
116 
117 typedef void *(lvm_pthread_fn_t)(void*);
118 
119 /* Prototypes for code further down */
120 static void sigusr2_handler(int sig);
121 static void sighup_handler(int sig);
122 static void sigterm_handler(int sig);
123 static void send_local_reply(struct local_client *client, int status,
124 			     int clientid);
125 static void free_reply(struct local_client *client);
126 static void send_version_message(void);
127 static void *pre_and_post_thread(void *arg);
128 static int send_message(void *buf, int msglen, const char *csid, int fd,
129 			const char *errtext);
130 static int read_from_local_sock(struct local_client *thisfd);
131 static int process_local_command(struct clvm_header *msg, int msglen,
132 				 struct local_client *client,
133 				 unsigned short xid);
134 static void process_remote_command(struct clvm_header *msg, int msglen, int fd,
135 				   const char *csid);
136 static int process_reply(const struct clvm_header *msg, int msglen,
137 			 const char *csid);
138 static int open_local_sock(void);
139 static int check_local_clvmd(void);
140 static struct local_client *find_client(int clientid);
141 static void main_loop(int local_sock, int cmd_timeout);
142 static void be_daemon(int start_timeout);
143 static int check_all_clvmds_running(struct local_client *client);
144 static int local_rendezvous_callback(struct local_client *thisfd, char *buf,
145 				     int len, const char *csid,
146 				     struct local_client **new_client);
147 static void lvm_thread_fn(void *) __attribute__ ((noreturn));
148 static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg,
149 			   int msglen, const char *csid);
150 static int distribute_command(struct local_client *thisfd);
151 static void hton_clvm(struct clvm_header *hdr);
152 static void ntoh_clvm(struct clvm_header *hdr);
153 static void add_reply_to_list(struct local_client *client, int status,
154 			      const char *csid, const char *buf, int len);
155 static if_type_t parse_cluster_interface(char *ifname);
156 static if_type_t get_cluster_type(void);
157 
usage(char * prog,FILE * file)158 static void usage(char *prog, FILE *file)
159 {
160 	fprintf(file, "Usage:\n");
161 	fprintf(file, "%s [Vhd]\n", prog);
162 	fprintf(file, "\n");
163 	fprintf(file, "   -V       Show version of clvmd\n");
164 	fprintf(file, "   -h       Show this help information\n");
165 	fprintf(file, "   -d       Set debug level\n");
166 	fprintf(file, "            If starting clvmd then don't fork, run in the foreground\n");
167 	fprintf(file, "   -R       Tell all running clvmds in the cluster to reload their device cache\n");
168 	fprintf(file, "   -C       Sets debug level (from -d) on all clvmd instances clusterwide\n");
169 	fprintf(file, "   -t<secs> Command timeout (default 60 seconds)\n");
170 	fprintf(file, "   -T<secs> Startup timeout (default none)\n");
171 	fprintf(file, "   -I<cmgr> Cluster manager (default: auto)\n");
172 	fprintf(file, "            Available cluster managers: ");
173 #ifdef USE_COROSYNC
174 	fprintf(file, "corosync ");
175 #endif
176 #ifdef USE_CMAN
177 	fprintf(file, "cman ");
178 #endif
179 #ifdef USE_OPENAIS
180 	fprintf(file, "openais ");
181 #endif
182 #ifdef USE_GULM
183 	fprintf(file, "gulm ");
184 #endif
185 	fprintf(file, "\n");
186 }
187 
188 /* Called to signal the parent how well we got on during initialisation */
child_init_signal(int status)189 static void child_init_signal(int status)
190 {
191         if (child_pipe[1]) {
192 	        write(child_pipe[1], &status, sizeof(status));
193 		close(child_pipe[1]);
194 	}
195 	if (status)
196 	        exit(status);
197 }
198 
199 
debuglog(const char * fmt,...)200 void debuglog(const char *fmt, ...)
201 {
202 	time_t P;
203 	va_list ap;
204 	static int syslog_init = 0;
205 
206 	if (debug == DEBUG_STDERR) {
207 		va_start(ap,fmt);
208 		time(&P);
209 		fprintf(stderr, "CLVMD[%x]: %.15s ", (int)pthread_self(), ctime(&P)+4 );
210 		vfprintf(stderr, fmt, ap);
211 		va_end(ap);
212 	}
213 	if (debug == DEBUG_SYSLOG) {
214 		if (!syslog_init) {
215 			openlog("clvmd", LOG_PID, LOG_DAEMON);
216 			syslog_init = 1;
217 		}
218 
219 		va_start(ap,fmt);
220 		vsyslog(LOG_DEBUG, fmt, ap);
221 		va_end(ap);
222 	}
223 }
224 
decode_cmd(unsigned char cmdl)225 static const char *decode_cmd(unsigned char cmdl)
226 {
227 	static char buf[128];
228 	const char *command;
229 
230 	switch (cmdl) {
231 	case CLVMD_CMD_TEST:
232 		command = "TEST";
233 		break;
234 	case CLVMD_CMD_LOCK_VG:
235 		command = "LOCK_VG";
236 		break;
237 	case CLVMD_CMD_LOCK_LV:
238 		command = "LOCK_LV";
239 		break;
240 	case CLVMD_CMD_REFRESH:
241 		command = "REFRESH";
242 		break;
243 	case CLVMD_CMD_SET_DEBUG:
244 		command = "SET_DEBUG";
245 		break;
246 	case CLVMD_CMD_GET_CLUSTERNAME:
247 		command = "GET_CLUSTERNAME";
248 		break;
249 	case CLVMD_CMD_VG_BACKUP:
250 		command = "VG_BACKUP";
251 		break;
252 	case CLVMD_CMD_REPLY:
253 		command = "REPLY";
254 		break;
255 	case CLVMD_CMD_VERSION:
256 		command = "VERSION";
257 		break;
258 	case CLVMD_CMD_GOAWAY:
259 		command = "GOAWAY";
260 		break;
261 	case CLVMD_CMD_LOCK:
262 		command = "LOCK";
263 		break;
264 	case CLVMD_CMD_UNLOCK:
265 		command = "UNLOCK";
266 		break;
267 	case CLVMD_CMD_LOCK_QUERY:
268 		command = "LOCK_QUERY";
269 		break;
270 	default:
271 		command = "unknown";
272 		break;
273 	}
274 
275 	sprintf(buf, "%s (0x%x)", command, cmdl);
276 
277 	return buf;
278 }
279 
main(int argc,char * argv[])280 int main(int argc, char *argv[])
281 {
282 	int local_sock;
283 	struct local_client *newfd;
284 	struct utsname nodeinfo;
285 	signed char opt;
286 	int cmd_timeout = DEFAULT_CMD_TIMEOUT;
287 	int start_timeout = 0;
288 	if_type_t cluster_iface = IF_AUTO;
289 	sigset_t ss;
290 	int using_gulm = 0;
291 	int debug_opt = 0;
292 	int clusterwide_opt = 0;
293 
294 	/* Deal with command-line arguments */
295 	opterr = 0;
296 	optind = 0;
297 	while ((opt = getopt(argc, argv, "?vVhd::t:RT:CI:")) != EOF) {
298 		switch (opt) {
299 		case 'h':
300 			usage(argv[0], stdout);
301 			exit(0);
302 
303 		case '?':
304 			usage(argv[0], stderr);
305 			exit(0);
306 
307 		case 'R':
308 			return refresh_clvmd()==1?0:1;
309 
310 		case 'C':
311 			clusterwide_opt = 1;
312 			break;
313 
314 		case 'd':
315 			debug_opt = 1;
316 			if (optarg)
317 				debug = atoi(optarg);
318 			else
319 				debug = DEBUG_STDERR;
320 			break;
321 
322 		case 't':
323 			cmd_timeout = atoi(optarg);
324 			if (!cmd_timeout) {
325 				fprintf(stderr, "command timeout is invalid\n");
326 				usage(argv[0], stderr);
327 				exit(1);
328 			}
329 			break;
330 		case 'I':
331 			cluster_iface = parse_cluster_interface(optarg);
332 			break;
333 		case 'T':
334 			start_timeout = atoi(optarg);
335 			if (start_timeout <= 0) {
336 				fprintf(stderr, "startup timeout is invalid\n");
337 				usage(argv[0], stderr);
338 				exit(1);
339 			}
340 			break;
341 
342 		case 'V':
343 		        printf("Cluster LVM daemon version: %s\n", LVM_VERSION);
344 			printf("Protocol version:           %d.%d.%d\n",
345 			       CLVMD_MAJOR_VERSION, CLVMD_MINOR_VERSION,
346 			       CLVMD_PATCH_VERSION);
347 			exit(1);
348 			break;
349 
350 		}
351 	}
352 
353 	/* Setting debug options on an existing clvmd */
354 	if (debug_opt && !check_local_clvmd()) {
355 
356 		/* Sending to stderr makes no sense for a detached daemon */
357 		if (debug == DEBUG_STDERR)
358 			debug = DEBUG_SYSLOG;
359 		return debug_clvmd(debug, clusterwide_opt)==1?0:1;
360 	}
361 
362 	/* Fork into the background (unless requested not to) */
363 	if (debug != DEBUG_STDERR) {
364 		be_daemon(start_timeout);
365 	}
366 
367 	DEBUGLOG("CLVMD started\n");
368 
369 	/* Open the Unix socket we listen for commands on.
370 	   We do this before opening the cluster socket so that
371 	   potential clients will block rather than error if we are running
372 	   but the cluster is not ready yet */
373 	local_sock = open_local_sock();
374 	if (local_sock < 0)
375 		child_init_signal(DFAIL_LOCAL_SOCK);
376 
377 	/* Set up signal handlers, USR1 is for cluster change notifications (in cman)
378 	   USR2 causes child threads to exit.
379 	   HUP causes gulm version to re-read nodes list from CCS.
380 	   PIPE should be ignored */
381 	signal(SIGUSR2, sigusr2_handler);
382 	signal(SIGHUP,  sighup_handler);
383 	signal(SIGPIPE, SIG_IGN);
384 
385 	/* Block SIGUSR2/SIGINT/SIGTERM in process */
386 	sigemptyset(&ss);
387 	sigaddset(&ss, SIGUSR2);
388 	sigaddset(&ss, SIGINT);
389 	sigaddset(&ss, SIGTERM);
390 	sigprocmask(SIG_BLOCK, &ss, NULL);
391 
392 	/* Initialise the LVM thread variables */
393 	dm_list_init(&lvm_cmd_head);
394 	pthread_mutex_init(&lvm_thread_mutex, NULL);
395 	pthread_cond_init(&lvm_thread_cond, NULL);
396 	pthread_mutex_init(&lvm_start_mutex, NULL);
397 	init_lvhash();
398 
399 	/* Start the cluster interface */
400 	if (cluster_iface == IF_AUTO)
401 		cluster_iface = get_cluster_type();
402 
403 #ifdef USE_CMAN
404 	if ((cluster_iface == IF_AUTO || cluster_iface == IF_CMAN) && (clops = init_cman_cluster())) {
405 		max_csid_len = CMAN_MAX_CSID_LEN;
406 		max_cluster_message = CMAN_MAX_CLUSTER_MESSAGE;
407 		max_cluster_member_name_len = CMAN_MAX_NODENAME_LEN;
408 		syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to CMAN");
409 	}
410 #endif
411 #ifdef USE_GULM
412 	if (!clops)
413 		if ((cluster_iface == IF_AUTO || cluster_iface == IF_GULM) && (clops = init_gulm_cluster())) {
414 			max_csid_len = GULM_MAX_CSID_LEN;
415 			max_cluster_message = GULM_MAX_CLUSTER_MESSAGE;
416 			max_cluster_member_name_len = GULM_MAX_CLUSTER_MEMBER_NAME_LEN;
417 			using_gulm = 1;
418 			syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to GULM");
419 		}
420 #endif
421 #ifdef USE_COROSYNC
422 	if (!clops)
423 		if (((cluster_iface == IF_AUTO || cluster_iface == IF_COROSYNC) && (clops = init_corosync_cluster()))) {
424 			max_csid_len = COROSYNC_CSID_LEN;
425 			max_cluster_message = COROSYNC_MAX_CLUSTER_MESSAGE;
426 			max_cluster_member_name_len = COROSYNC_MAX_CLUSTER_MEMBER_NAME_LEN;
427 			syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to Corosync");
428 		}
429 #endif
430 #ifdef USE_OPENAIS
431 	if (!clops)
432 		if ((cluster_iface == IF_AUTO || cluster_iface == IF_OPENAIS) && (clops = init_openais_cluster())) {
433 			max_csid_len = OPENAIS_CSID_LEN;
434 			max_cluster_message = OPENAIS_MAX_CLUSTER_MESSAGE;
435 			max_cluster_member_name_len = OPENAIS_MAX_CLUSTER_MEMBER_NAME_LEN;
436 			syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to OpenAIS");
437 		}
438 #endif
439 
440 	if (!clops) {
441 		DEBUGLOG("Can't initialise cluster interface\n");
442 		log_error("Can't initialise cluster interface\n");
443 		child_init_signal(DFAIL_CLUSTER_IF);
444 	}
445 	DEBUGLOG("Cluster ready, doing some more initialisation\n");
446 
447 	/* Save our CSID */
448 	uname(&nodeinfo);
449 	clops->get_our_csid(our_csid);
450 
451 	/* Initialise the FD list head */
452 	local_client_head.fd = clops->get_main_cluster_fd();
453 	local_client_head.type = CLUSTER_MAIN_SOCK;
454 	local_client_head.callback = clops->cluster_fd_callback;
455 
456 	/* Add the local socket to the list */
457 	newfd = malloc(sizeof(struct local_client));
458 	if (!newfd)
459 	        child_init_signal(DFAIL_MALLOC);
460 
461 	newfd->fd = local_sock;
462 	newfd->removeme = 0;
463 	newfd->type = LOCAL_RENDEZVOUS;
464 	newfd->callback = local_rendezvous_callback;
465 	newfd->next = local_client_head.next;
466 	local_client_head.next = newfd;
467 
468 	/* This needs to be started after cluster initialisation
469 	   as it may need to take out locks */
470 	DEBUGLOG("starting LVM thread\n");
471 
472 	/* Don't let anyone else to do work until we are started */
473 	pthread_mutex_lock(&lvm_start_mutex);
474 	pthread_create(&lvm_thread, NULL, (lvm_pthread_fn_t*)lvm_thread_fn,
475 			(void *)(long)using_gulm);
476 
477 	/* Tell the rest of the cluster our version number */
478 	/* CMAN can do this immediately, gulm needs to wait until
479 	   the core initialisation has finished and the node list
480 	   has been gathered */
481 	if (clops->cluster_init_completed)
482 		clops->cluster_init_completed();
483 
484 	DEBUGLOG("clvmd ready for work\n");
485 	child_init_signal(SUCCESS);
486 
487 	/* Try to shutdown neatly */
488 	signal(SIGTERM, sigterm_handler);
489 	signal(SIGINT, sigterm_handler);
490 
491 	/* Do some work */
492 	main_loop(local_sock, cmd_timeout);
493 
494 	destroy_lvm();
495 
496 	return 0;
497 }
498 
499 /* Called when the GuLM cluster layer has completed initialisation.
500    We send the version message */
clvmd_cluster_init_completed()501 void clvmd_cluster_init_completed()
502 {
503 	send_version_message();
504 }
505 
506 /* Data on a connected socket */
local_sock_callback(struct local_client * thisfd,char * buf,int len,const char * csid,struct local_client ** new_client)507 static int local_sock_callback(struct local_client *thisfd, char *buf, int len,
508 			       const char *csid,
509 			       struct local_client **new_client)
510 {
511 	*new_client = NULL;
512 	return read_from_local_sock(thisfd);
513 }
514 
515 /* Data on a connected socket */
local_rendezvous_callback(struct local_client * thisfd,char * buf,int len,const char * csid,struct local_client ** new_client)516 static int local_rendezvous_callback(struct local_client *thisfd, char *buf,
517 				     int len, const char *csid,
518 				     struct local_client **new_client)
519 {
520 	/* Someone connected to our local socket, accept it. */
521 
522 	struct sockaddr_un socka;
523 	struct local_client *newfd;
524 	socklen_t sl = sizeof(socka);
525 	int client_fd = accept(thisfd->fd, (struct sockaddr *) &socka, &sl);
526 
527 	if (client_fd == -1 && errno == EINTR)
528 		return 1;
529 
530 	if (client_fd >= 0) {
531 		newfd = malloc(sizeof(struct local_client));
532 		if (!newfd) {
533 			close(client_fd);
534 			return 1;
535 		}
536 		newfd->fd = client_fd;
537 		newfd->type = LOCAL_SOCK;
538 		newfd->xid = 0;
539 		newfd->removeme = 0;
540 		newfd->callback = local_sock_callback;
541 		newfd->bits.localsock.replies = NULL;
542 		newfd->bits.localsock.expected_replies = 0;
543 		newfd->bits.localsock.cmd = NULL;
544 		newfd->bits.localsock.in_progress = FALSE;
545 		newfd->bits.localsock.sent_out = FALSE;
546 		newfd->bits.localsock.threadid = 0;
547 		newfd->bits.localsock.finished = 0;
548 		newfd->bits.localsock.pipe_client = NULL;
549 		newfd->bits.localsock.private = NULL;
550 		newfd->bits.localsock.all_success = 1;
551 		DEBUGLOG("Got new connection on fd %d\n", newfd->fd);
552 		*new_client = newfd;
553 	}
554 	return 1;
555 }
556 
local_pipe_callback(struct local_client * thisfd,char * buf,int maxlen,const char * csid,struct local_client ** new_client)557 static int local_pipe_callback(struct local_client *thisfd, char *buf,
558 			       int maxlen, const char *csid,
559 			       struct local_client **new_client)
560 {
561 	int len;
562 	char buffer[PIPE_BUF];
563 	struct local_client *sock_client = thisfd->bits.pipe.client;
564 	int status = -1;	/* in error by default */
565 
566 	len = read(thisfd->fd, buffer, sizeof(int));
567 	if (len == -1 && errno == EINTR)
568 		return 1;
569 
570 	if (len == sizeof(int)) {
571 		memcpy(&status, buffer, sizeof(int));
572 	}
573 
574 	DEBUGLOG("read on PIPE %d: %d bytes: status: %d\n",
575 		 thisfd->fd, len, status);
576 
577 	/* EOF on pipe or an error, close it */
578 	if (len <= 0) {
579 		int jstat;
580 		void *ret = &status;
581 		close(thisfd->fd);
582 
583 		/* Clear out the cross-link */
584 		if (thisfd->bits.pipe.client != NULL)
585 			thisfd->bits.pipe.client->bits.localsock.pipe_client =
586 			    NULL;
587 
588 		/* Reap child thread */
589 		if (thisfd->bits.pipe.threadid) {
590 			jstat = pthread_join(thisfd->bits.pipe.threadid, &ret);
591 			thisfd->bits.pipe.threadid = 0;
592 			if (thisfd->bits.pipe.client != NULL)
593 				thisfd->bits.pipe.client->bits.localsock.
594 				    threadid = 0;
595 		}
596 		return -1;
597 	} else {
598 		DEBUGLOG("background routine status was %d, sock_client=%p\n",
599 			 status, sock_client);
600 		/* But has the client gone away ?? */
601 		if (sock_client == NULL) {
602 			DEBUGLOG
603 			    ("Got PIPE response for dead client, ignoring it\n");
604 		} else {
605 			/* If error then just return that code */
606 			if (status)
607 				send_local_reply(sock_client, status,
608 						 sock_client->fd);
609 			else {
610 				if (sock_client->bits.localsock.state ==
611 				    POST_COMMAND) {
612 					send_local_reply(sock_client, 0,
613 							 sock_client->fd);
614 				} else	// PRE_COMMAND finished.
615 				{
616 					if (
617 					    (status =
618 					     distribute_command(sock_client)) !=
619 					    0) send_local_reply(sock_client,
620 								EFBIG,
621 								sock_client->
622 								fd);
623 				}
624 			}
625 		}
626 	}
627 	return len;
628 }
629 
630 /* If a noed is up, look for it in the reply array, if it's not there then
631    add one with "ETIMEDOUT".
632    NOTE: This won't race with real replies because they happen in the same thread.
633 */
timedout_callback(struct local_client * client,const char * csid,int node_up)634 static void timedout_callback(struct local_client *client, const char *csid,
635 			      int node_up)
636 {
637 	if (node_up) {
638 		struct node_reply *reply;
639 		char nodename[max_cluster_member_name_len];
640 
641 		clops->name_from_csid(csid, nodename);
642 		DEBUGLOG("Checking for a reply from %s\n", nodename);
643 		pthread_mutex_lock(&client->bits.localsock.reply_mutex);
644 
645 		reply = client->bits.localsock.replies;
646 		while (reply && strcmp(reply->node, nodename) != 0) {
647 			reply = reply->next;
648 		}
649 
650 		pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
651 
652 		if (!reply) {
653 			DEBUGLOG("Node %s timed-out\n", nodename);
654 			add_reply_to_list(client, ETIMEDOUT, csid,
655 					  "Command timed out", 18);
656 		}
657 	}
658 }
659 
660 /* Called when the request has timed out on at least one node. We fill in
661    the remaining node entries with ETIMEDOUT and return.
662 
663    By the time we get here the node that caused
664    the timeout could have gone down, in which case we will never get the expected
665    number of replies that triggers the post command so we need to do it here
666 */
request_timed_out(struct local_client * client)667 static void request_timed_out(struct local_client *client)
668 {
669 	DEBUGLOG("Request timed-out. padding\n");
670 	clops->cluster_do_node_callback(client, timedout_callback);
671 
672 	if (client->bits.localsock.num_replies !=
673 	    client->bits.localsock.expected_replies) {
674 		/* Post-process the command */
675 		if (client->bits.localsock.threadid) {
676 			pthread_mutex_lock(&client->bits.localsock.mutex);
677 			client->bits.localsock.state = POST_COMMAND;
678 			pthread_cond_signal(&client->bits.localsock.cond);
679 			pthread_mutex_unlock(&client->bits.localsock.mutex);
680 		}
681 	}
682 }
683 
684 /* This is where the real work happens */
main_loop(int local_sock,int cmd_timeout)685 static void main_loop(int local_sock, int cmd_timeout)
686 {
687 	DEBUGLOG("Using timeout of %d seconds\n", cmd_timeout);
688 
689 	sigset_t ss;
690 	sigemptyset(&ss);
691 	sigaddset(&ss, SIGINT);
692 	sigaddset(&ss, SIGTERM);
693 	pthread_sigmask(SIG_UNBLOCK, &ss, NULL);
694 	/* Main loop */
695 	while (!quit) {
696 		fd_set in;
697 		int select_status;
698 		struct local_client *thisfd;
699 		struct timeval tv = { cmd_timeout, 0 };
700 		int quorate = clops->is_quorate();
701 
702 		/* Wait on the cluster FD and all local sockets/pipes */
703 		local_client_head.fd = clops->get_main_cluster_fd();
704 		FD_ZERO(&in);
705 		for (thisfd = &local_client_head; thisfd != NULL;
706 		     thisfd = thisfd->next) {
707 
708 			if (thisfd->removeme)
709 				continue;
710 
711 			/* if the cluster is not quorate then don't listen for new requests */
712 			if ((thisfd->type != LOCAL_RENDEZVOUS &&
713 			     thisfd->type != LOCAL_SOCK) || quorate)
714 				FD_SET(thisfd->fd, &in);
715 		}
716 
717 		select_status = select(FD_SETSIZE, &in, NULL, NULL, &tv);
718 
719 		if (reread_config) {
720 			int saved_errno = errno;
721 
722 			reread_config = 0;
723 			if (clops->reread_config)
724 				clops->reread_config();
725 			errno = saved_errno;
726 		}
727 
728 		if (select_status > 0) {
729 			struct local_client *lastfd = NULL;
730 			char csid[MAX_CSID_LEN];
731 			char buf[max_cluster_message];
732 
733 			for (thisfd = &local_client_head; thisfd != NULL;
734 			     thisfd = thisfd->next) {
735 
736 				if (thisfd->removeme) {
737 					struct local_client *free_fd;
738 					lastfd->next = thisfd->next;
739 					free_fd = thisfd;
740 					thisfd = lastfd;
741 
742 					DEBUGLOG("removeme set for fd %d\n", free_fd->fd);
743 
744 					/* Queue cleanup, this also frees the client struct */
745 					add_to_lvmqueue(free_fd, NULL, 0, NULL);
746 					break;
747 				}
748 
749 				if (FD_ISSET(thisfd->fd, &in)) {
750 					struct local_client *newfd = NULL;
751 					int ret;
752 
753 					/* Do callback */
754 					ret =
755 					    thisfd->callback(thisfd, buf,
756 							     sizeof(buf), csid,
757 							     &newfd);
758 					/* Ignore EAGAIN */
759 					if (ret < 0 && (errno == EAGAIN ||
760 							errno == EINTR)) continue;
761 
762 					/* Got error or EOF: Remove it from the list safely */
763 					if (ret <= 0) {
764 						struct local_client *free_fd;
765 						int type = thisfd->type;
766 
767 						/* If the cluster socket shuts down, so do we */
768 						if (type == CLUSTER_MAIN_SOCK ||
769 						    type == CLUSTER_INTERNAL)
770 							goto closedown;
771 
772 						DEBUGLOG("ret == %d, errno = %d. removing client\n",
773 							 ret, errno);
774 						lastfd->next = thisfd->next;
775 						free_fd = thisfd;
776 						thisfd = lastfd;
777 						close(free_fd->fd);
778 
779 						/* Queue cleanup, this also frees the client struct */
780 						add_to_lvmqueue(free_fd, NULL, 0, NULL);
781 						break;
782 					}
783 
784 					/* New client...simply add it to the list */
785 					if (newfd) {
786 						newfd->next = thisfd->next;
787 						thisfd->next = newfd;
788 						break;
789 					}
790 				}
791 				lastfd = thisfd;
792 			}
793 		}
794 
795 		/* Select timed out. Check for clients that have been waiting too long for a response */
796 		if (select_status == 0) {
797 			time_t the_time = time(NULL);
798 
799 			for (thisfd = &local_client_head; thisfd != NULL;
800 			     thisfd = thisfd->next) {
801 				if (thisfd->type == LOCAL_SOCK
802 				    && thisfd->bits.localsock.sent_out
803 				    && thisfd->bits.localsock.sent_time +
804 				    cmd_timeout < the_time
805 				    && thisfd->bits.localsock.
806 				    expected_replies !=
807 				    thisfd->bits.localsock.num_replies) {
808 					/* Send timed out message + replies we already have */
809 					DEBUGLOG
810 					    ("Request timed-out (send: %ld, now: %ld)\n",
811 					     thisfd->bits.localsock.sent_time,
812 					     the_time);
813 
814 					thisfd->bits.localsock.all_success = 0;
815 
816 					request_timed_out(thisfd);
817 				}
818 			}
819 		}
820 		if (select_status < 0) {
821 			if (errno == EINTR)
822 				continue;
823 
824 #ifdef DEBUG
825 			perror("select error");
826 			exit(-1);
827 #endif
828 		}
829 	}
830 
831       closedown:
832 	clops->cluster_closedown();
833 	close(local_sock);
834 }
835 
wait_for_child(int c_pipe,int timeout)836 static __attribute__ ((noreturn)) void wait_for_child(int c_pipe, int timeout)
837 {
838 	int child_status;
839 	int sstat;
840 	fd_set fds;
841 	struct timeval tv = {timeout, 0};
842 
843 	FD_ZERO(&fds);
844 	FD_SET(c_pipe, &fds);
845 
846 	sstat = select(c_pipe+1, &fds, NULL, NULL, timeout? &tv: NULL);
847 	if (sstat == 0) {
848 		fprintf(stderr, "clvmd startup timed out\n");
849 		exit(DFAIL_TIMEOUT);
850 	}
851 	if (sstat == 1) {
852 		if (read(c_pipe, &child_status, sizeof(child_status)) !=
853 		    sizeof(child_status)) {
854 
855 			fprintf(stderr, "clvmd failed in initialisation\n");
856 			exit(DFAIL_INIT);
857 		}
858 		else {
859 			switch (child_status) {
860 			case SUCCESS:
861 				break;
862 			case DFAIL_INIT:
863 				fprintf(stderr, "clvmd failed in initialisation\n");
864 				break;
865 			case DFAIL_LOCAL_SOCK:
866 				fprintf(stderr, "clvmd could not create local socket\n");
867 				fprintf(stderr, "Another clvmd is probably already running\n");
868 				break;
869 			case DFAIL_CLUSTER_IF:
870 				fprintf(stderr, "clvmd could not connect to cluster manager\n");
871 				fprintf(stderr, "Consult syslog for more information\n");
872 				break;
873 			case DFAIL_MALLOC:
874 				fprintf(stderr, "clvmd failed, not enough memory\n");
875 				break;
876 			default:
877 				fprintf(stderr, "clvmd failed, error was %d\n", child_status);
878 				break;
879 			}
880 			exit(child_status);
881 		}
882 	}
883 	fprintf(stderr, "clvmd startup, select failed: %s\n", strerror(errno));
884 	exit(DFAIL_INIT);
885 }
886 
887 /*
888  * Fork into the background and detach from our parent process.
889  * In the interests of user-friendliness we wait for the daemon
890  * to complete initialisation before returning its status
891  * the the user.
892  */
be_daemon(int timeout)893 static void be_daemon(int timeout)
894 {
895         pid_t pid;
896 	int devnull = open("/dev/null", O_RDWR);
897 	if (devnull == -1) {
898 		perror("Can't open /dev/null");
899 		exit(3);
900 	}
901 
902 	pipe(child_pipe);
903 
904 	switch (pid = fork()) {
905 	case -1:
906 		perror("clvmd: can't fork");
907 		exit(2);
908 
909 	case 0:		/* Child */
910 	        close(child_pipe[0]);
911 		break;
912 
913 	default:       /* Parent */
914 		close(child_pipe[1]);
915 		wait_for_child(child_pipe[0], timeout);
916 	}
917 
918 	/* Detach ourself from the calling environment */
919 	if (close(0) || close(1) || close(2)) {
920 		perror("Error closing terminal FDs");
921 		exit(4);
922 	}
923 	setsid();
924 
925 	if (dup2(devnull, 0) < 0 || dup2(devnull, 1) < 0
926 	    || dup2(devnull, 2) < 0) {
927 		int e = errno;
928 		perror("Error setting terminal FDs to /dev/null");
929 		log_error("Error setting terminal FDs to /dev/null: %s", strerror(e));
930 		exit(5);
931 	}
932 	if (chdir("/")) {
933 		log_error("Error setting current directory to /: %s", strerror(e));
934 		exit(6);
935 	}
936 
937 }
938 
939 /* Called when we have a read from the local socket.
940    was in the main loop but it's grown up and is a big girl now */
read_from_local_sock(struct local_client * thisfd)941 static int read_from_local_sock(struct local_client *thisfd)
942 {
943 	int len;
944 	int argslen;
945 	int missing_len;
946 	char buffer[PIPE_BUF];
947 
948 	len = read(thisfd->fd, buffer, sizeof(buffer));
949 	if (len == -1 && errno == EINTR)
950 		return 1;
951 
952 	DEBUGLOG("Read on local socket %d, len = %d\n", thisfd->fd, len);
953 
954 	/* EOF or error on socket */
955 	if (len <= 0) {
956 		int *status;
957 		int jstat;
958 
959 		DEBUGLOG("EOF on local socket: inprogress=%d\n",
960 			 thisfd->bits.localsock.in_progress);
961 
962 		thisfd->bits.localsock.finished = 1;
963 
964 		/* If the client went away in mid command then tidy up */
965 		if (thisfd->bits.localsock.in_progress) {
966 			pthread_kill(thisfd->bits.localsock.threadid, SIGUSR2);
967 			pthread_mutex_lock(&thisfd->bits.localsock.mutex);
968 			thisfd->bits.localsock.state = POST_COMMAND;
969 			pthread_cond_signal(&thisfd->bits.localsock.cond);
970 			pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
971 
972 			/* Free any unsent buffers */
973 			free_reply(thisfd);
974 		}
975 
976 		/* Kill the subthread & free resources */
977 		if (thisfd->bits.localsock.threadid) {
978 			DEBUGLOG("Waiting for child thread\n");
979 			pthread_mutex_lock(&thisfd->bits.localsock.mutex);
980 			thisfd->bits.localsock.state = PRE_COMMAND;
981 			pthread_cond_signal(&thisfd->bits.localsock.cond);
982 			pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
983 
984 			jstat =
985 			    pthread_join(thisfd->bits.localsock.threadid,
986 					 (void **) &status);
987 			DEBUGLOG("Joined child thread\n");
988 
989 			thisfd->bits.localsock.threadid = 0;
990 			pthread_cond_destroy(&thisfd->bits.localsock.cond);
991 			pthread_mutex_destroy(&thisfd->bits.localsock.mutex);
992 
993 			/* Remove the pipe client */
994 			if (thisfd->bits.localsock.pipe_client != NULL) {
995 				struct local_client *newfd;
996 				struct local_client *lastfd = NULL;
997 				struct local_client *free_fd = NULL;
998 
999 				close(thisfd->bits.localsock.pipe_client->fd);	/* Close pipe */
1000 				close(thisfd->bits.localsock.pipe);
1001 
1002 				/* Remove pipe client */
1003 				for (newfd = &local_client_head; newfd != NULL;
1004 				     newfd = newfd->next) {
1005 					if (thisfd->bits.localsock.
1006 					    pipe_client == newfd) {
1007 						thisfd->bits.localsock.
1008 						    pipe_client = NULL;
1009 
1010 						lastfd->next = newfd->next;
1011 						free_fd = newfd;
1012 						newfd->next = lastfd;
1013 						free(free_fd);
1014 						break;
1015 					}
1016 					lastfd = newfd;
1017 				}
1018 			}
1019 		}
1020 
1021 		/* Free the command buffer */
1022 		free(thisfd->bits.localsock.cmd);
1023 
1024 		/* Clear out the cross-link */
1025 		if (thisfd->bits.localsock.pipe_client != NULL)
1026 			thisfd->bits.localsock.pipe_client->bits.pipe.client =
1027 			    NULL;
1028 
1029 		close(thisfd->fd);
1030 		return 0;
1031 	} else {
1032 		int comms_pipe[2];
1033 		struct local_client *newfd;
1034 		char csid[MAX_CSID_LEN];
1035 		struct clvm_header *inheader;
1036 		int status;
1037 
1038 		inheader = (struct clvm_header *) buffer;
1039 
1040 		/* Fill in the client ID */
1041 		inheader->clientid = htonl(thisfd->fd);
1042 
1043 		/* If we are already busy then return an error */
1044 		if (thisfd->bits.localsock.in_progress) {
1045 			struct clvm_header reply;
1046 			reply.cmd = CLVMD_CMD_REPLY;
1047 			reply.status = EBUSY;
1048 			reply.arglen = 0;
1049 			reply.flags = 0;
1050 			send_message(&reply, sizeof(reply), our_csid,
1051 				     thisfd->fd,
1052 				     "Error sending EBUSY reply to local user");
1053 			return len;
1054 		}
1055 
1056 		/* Free any old buffer space */
1057 		free(thisfd->bits.localsock.cmd);
1058 
1059 		/* See if we have the whole message */
1060 		argslen =
1061 		    len - strlen(inheader->node) - sizeof(struct clvm_header);
1062 		missing_len = inheader->arglen - argslen;
1063 
1064 		if (missing_len < 0)
1065 			missing_len = 0;
1066 
1067 		/* Save the message */
1068 		thisfd->bits.localsock.cmd = malloc(len + missing_len);
1069 
1070 		if (!thisfd->bits.localsock.cmd) {
1071 			struct clvm_header reply;
1072 			reply.cmd = CLVMD_CMD_REPLY;
1073 			reply.status = ENOMEM;
1074 			reply.arglen = 0;
1075 			reply.flags = 0;
1076 			send_message(&reply, sizeof(reply), our_csid,
1077 				     thisfd->fd,
1078 				     "Error sending ENOMEM reply to local user");
1079 			return 0;
1080 		}
1081 		memcpy(thisfd->bits.localsock.cmd, buffer, len);
1082 		thisfd->bits.localsock.cmd_len = len + missing_len;
1083 		inheader = (struct clvm_header *) thisfd->bits.localsock.cmd;
1084 
1085 		/* If we don't have the full message then read the rest now */
1086 		if (missing_len) {
1087 			char *argptr =
1088 			    inheader->node + strlen(inheader->node) + 1;
1089 
1090 			while (missing_len > 0 && len >= 0) {
1091 				DEBUGLOG
1092 				    ("got %d bytes, need another %d (total %d)\n",
1093 				     argslen, missing_len, inheader->arglen);
1094 				len = read(thisfd->fd, argptr + argslen,
1095 					   missing_len);
1096 				if (len >= 0) {
1097 					missing_len -= len;
1098 					argslen += len;
1099 				}
1100 			}
1101 		}
1102 
1103 		/* Initialise and lock the mutex so the subthread will wait after
1104 		   finishing the PRE routine */
1105 		if (!thisfd->bits.localsock.threadid) {
1106 			pthread_mutex_init(&thisfd->bits.localsock.mutex, NULL);
1107 			pthread_cond_init(&thisfd->bits.localsock.cond, NULL);
1108 			pthread_mutex_init(&thisfd->bits.localsock.reply_mutex, NULL);
1109 		}
1110 
1111 		/* Only run the command if all the cluster nodes are running CLVMD */
1112 		if (((inheader->flags & CLVMD_FLAG_LOCAL) == 0) &&
1113 		    (check_all_clvmds_running(thisfd) == -1)) {
1114 			thisfd->bits.localsock.expected_replies = 0;
1115 			thisfd->bits.localsock.num_replies = 0;
1116 			send_local_reply(thisfd, EHOSTDOWN, thisfd->fd);
1117 			return len;
1118 		}
1119 
1120 		/* Check the node name for validity */
1121 		if (inheader->node[0] && clops->csid_from_name(csid, inheader->node)) {
1122 			/* Error, node is not in the cluster */
1123 			struct clvm_header reply;
1124 			DEBUGLOG("Unknown node: '%s'\n", inheader->node);
1125 
1126 			reply.cmd = CLVMD_CMD_REPLY;
1127 			reply.status = ENOENT;
1128 			reply.flags = 0;
1129 			reply.arglen = 0;
1130 			send_message(&reply, sizeof(reply), our_csid,
1131 				     thisfd->fd,
1132 				     "Error sending ENOENT reply to local user");
1133 			thisfd->bits.localsock.expected_replies = 0;
1134 			thisfd->bits.localsock.num_replies = 0;
1135 			thisfd->bits.localsock.in_progress = FALSE;
1136 			thisfd->bits.localsock.sent_out = FALSE;
1137 			return len;
1138 		}
1139 
1140 		/* If we already have a subthread then just signal it to start */
1141 		if (thisfd->bits.localsock.threadid) {
1142 			pthread_mutex_lock(&thisfd->bits.localsock.mutex);
1143 			thisfd->bits.localsock.state = PRE_COMMAND;
1144 			pthread_cond_signal(&thisfd->bits.localsock.cond);
1145 			pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
1146 			return len;
1147 		}
1148 
1149 		/* Create a pipe and add the reading end to our FD list */
1150 		pipe(comms_pipe);
1151 		newfd = malloc(sizeof(struct local_client));
1152 		if (!newfd) {
1153 			struct clvm_header reply;
1154 			close(comms_pipe[0]);
1155 			close(comms_pipe[1]);
1156 
1157 			reply.cmd = CLVMD_CMD_REPLY;
1158 			reply.status = ENOMEM;
1159 			reply.arglen = 0;
1160 			reply.flags = 0;
1161 			send_message(&reply, sizeof(reply), our_csid,
1162 				     thisfd->fd,
1163 				     "Error sending ENOMEM reply to local user");
1164 			return len;
1165 		}
1166 		DEBUGLOG("creating pipe, [%d, %d]\n", comms_pipe[0],
1167 			 comms_pipe[1]);
1168 		newfd->fd = comms_pipe[0];
1169 		newfd->removeme = 0;
1170 		newfd->type = THREAD_PIPE;
1171 		newfd->callback = local_pipe_callback;
1172 		newfd->next = thisfd->next;
1173 		newfd->bits.pipe.client = thisfd;
1174 		newfd->bits.pipe.threadid = 0;
1175 		thisfd->next = newfd;
1176 
1177 		/* Store a cross link to the pipe */
1178 		thisfd->bits.localsock.pipe_client = newfd;
1179 
1180 		thisfd->bits.localsock.pipe = comms_pipe[1];
1181 
1182 		/* Make sure the thread has a copy of it's own ID */
1183 		newfd->bits.pipe.threadid = thisfd->bits.localsock.threadid;
1184 
1185 		/* Run the pre routine */
1186 		thisfd->bits.localsock.in_progress = TRUE;
1187 		thisfd->bits.localsock.state = PRE_COMMAND;
1188 		DEBUGLOG("Creating pre&post thread\n");
1189 		status = pthread_create(&thisfd->bits.localsock.threadid, NULL,
1190 			       pre_and_post_thread, thisfd);
1191 		DEBUGLOG("Created pre&post thread, state = %d\n", status);
1192 	}
1193 	return len;
1194 }
1195 
1196 /* Add a file descriptor from the cluster or comms interface to
1197    our list of FDs for select
1198 */
add_client(struct local_client * new_client)1199 int add_client(struct local_client *new_client)
1200 {
1201 	new_client->next = local_client_head.next;
1202 	local_client_head.next = new_client;
1203 
1204 	return 0;
1205 }
1206 
1207 /* Called when the pre-command has completed successfully - we
1208    now execute the real command on all the requested nodes */
distribute_command(struct local_client * thisfd)1209 static int distribute_command(struct local_client *thisfd)
1210 {
1211 	struct clvm_header *inheader =
1212 	    (struct clvm_header *) thisfd->bits.localsock.cmd;
1213 	int len = thisfd->bits.localsock.cmd_len;
1214 
1215 	thisfd->xid = global_xid++;
1216 	DEBUGLOG("distribute command: XID = %d\n", thisfd->xid);
1217 
1218 	/* Forward it to other nodes in the cluster if needed */
1219 	if (!(inheader->flags & CLVMD_FLAG_LOCAL)) {
1220 		/* if node is empty then do it on the whole cluster */
1221 		if (inheader->node[0] == '\0') {
1222 			thisfd->bits.localsock.expected_replies =
1223 			    clops->get_num_nodes();
1224 			thisfd->bits.localsock.num_replies = 0;
1225 			thisfd->bits.localsock.sent_time = time(NULL);
1226 			thisfd->bits.localsock.in_progress = TRUE;
1227 			thisfd->bits.localsock.sent_out = TRUE;
1228 
1229 			/* Do it here first */
1230 			add_to_lvmqueue(thisfd, inheader, len, NULL);
1231 
1232 			DEBUGLOG("Sending message to all cluster nodes\n");
1233 			inheader->xid = thisfd->xid;
1234 			send_message(inheader, len, NULL, -1,
1235 				     "Error forwarding message to cluster");
1236 		} else {
1237                         /* Do it on a single node */
1238 			char csid[MAX_CSID_LEN];
1239 
1240 			if (clops->csid_from_name(csid, inheader->node)) {
1241 				/* This has already been checked so should not happen */
1242 				return 0;
1243 			} else {
1244 			        /* OK, found a node... */
1245 				thisfd->bits.localsock.expected_replies = 1;
1246 				thisfd->bits.localsock.num_replies = 0;
1247 				thisfd->bits.localsock.in_progress = TRUE;
1248 
1249 				/* Are we the requested node ?? */
1250 				if (memcmp(csid, our_csid, max_csid_len) == 0) {
1251 					DEBUGLOG("Doing command on local node only\n");
1252 					add_to_lvmqueue(thisfd, inheader, len, NULL);
1253 				} else {
1254 					DEBUGLOG("Sending message to single node: %s\n",
1255 						 inheader->node);
1256 					inheader->xid = thisfd->xid;
1257 					send_message(inheader, len,
1258 						     csid, -1,
1259 						     "Error forwarding message to cluster node");
1260 				}
1261 			}
1262 		}
1263 	} else {
1264 		/* Local explicitly requested, ignore nodes */
1265 		thisfd->bits.localsock.in_progress = TRUE;
1266 		thisfd->bits.localsock.expected_replies = 1;
1267 		thisfd->bits.localsock.num_replies = 0;
1268 		add_to_lvmqueue(thisfd, inheader, len, NULL);
1269 	}
1270 	return 0;
1271 }
1272 
1273 /* Process a command from a remote node and return the result */
process_remote_command(struct clvm_header * msg,int msglen,int fd,const char * csid)1274 static void process_remote_command(struct clvm_header *msg, int msglen, int fd,
1275 			    	   const char *csid)
1276 {
1277 	char *replyargs;
1278 	char nodename[max_cluster_member_name_len];
1279 	int replylen = 0;
1280 	int buflen = max_cluster_message - sizeof(struct clvm_header) - 1;
1281 	int status;
1282 	int msg_malloced = 0;
1283 
1284 	/* Get the node name as we /may/ need it later */
1285 	clops->name_from_csid(csid, nodename);
1286 
1287 	DEBUGLOG("process_remote_command %s for clientid 0x%x XID %d on node %s\n",
1288 		 decode_cmd(msg->cmd), msg->clientid, msg->xid, nodename);
1289 
1290 	/* Check for GOAWAY and sulk */
1291 	if (msg->cmd == CLVMD_CMD_GOAWAY) {
1292 
1293 		DEBUGLOG("Told to go away by %s\n", nodename);
1294 		log_error("Told to go away by %s\n", nodename);
1295 		exit(99);
1296 	}
1297 
1298 	/* Version check is internal - don't bother exposing it in
1299 	   clvmd-command.c */
1300 	if (msg->cmd == CLVMD_CMD_VERSION) {
1301 		int version_nums[3];
1302 		char node[256];
1303 
1304 		memcpy(version_nums, msg->args, sizeof(version_nums));
1305 
1306 		clops->name_from_csid(csid, node);
1307 		DEBUGLOG("Remote node %s is version %d.%d.%d\n",
1308 			 node,
1309 			 ntohl(version_nums[0]),
1310 			 ntohl(version_nums[1]), ntohl(version_nums[2]));
1311 
1312 		if (ntohl(version_nums[0]) != CLVMD_MAJOR_VERSION) {
1313 			struct clvm_header byebyemsg;
1314 			DEBUGLOG
1315 			    ("Telling node %s to go away because of incompatible version number\n",
1316 			     node);
1317 			log_notice
1318 			    ("Telling node %s to go away because of incompatible version number %d.%d.%d\n",
1319 			     node, ntohl(version_nums[0]),
1320 			     ntohl(version_nums[1]), ntohl(version_nums[2]));
1321 
1322 			byebyemsg.cmd = CLVMD_CMD_GOAWAY;
1323 			byebyemsg.status = 0;
1324 			byebyemsg.flags = 0;
1325 			byebyemsg.arglen = 0;
1326 			byebyemsg.clientid = 0;
1327 			clops->cluster_send_message(&byebyemsg, sizeof(byebyemsg),
1328 					     our_csid,
1329 					     "Error Sending GOAWAY message");
1330 		} else {
1331 			clops->add_up_node(csid);
1332 		}
1333 		return;
1334 	}
1335 
1336 	/* Allocate a default reply buffer */
1337 	replyargs = malloc(max_cluster_message - sizeof(struct clvm_header));
1338 
1339 	if (replyargs != NULL) {
1340 		/* Run the command */
1341 		status =
1342 		    do_command(NULL, msg, msglen, &replyargs, buflen,
1343 			       &replylen);
1344 	} else {
1345 		status = ENOMEM;
1346 	}
1347 
1348 	/* If it wasn't a reply, then reply */
1349 	if (msg->cmd != CLVMD_CMD_REPLY) {
1350 		char *aggreply;
1351 
1352 		aggreply =
1353 		    realloc(replyargs, replylen + sizeof(struct clvm_header));
1354 		if (aggreply) {
1355 			struct clvm_header *agghead =
1356 			    (struct clvm_header *) aggreply;
1357 
1358 			replyargs = aggreply;
1359 			/* Move it up so there's room for a header in front of the data */
1360 			memmove(aggreply + offsetof(struct clvm_header, args),
1361 				replyargs, replylen);
1362 
1363 			agghead->xid = msg->xid;
1364 			agghead->cmd = CLVMD_CMD_REPLY;
1365 			agghead->status = status;
1366 			agghead->flags = 0;
1367 			agghead->clientid = msg->clientid;
1368 			agghead->arglen = replylen;
1369 			agghead->node[0] = '\0';
1370 			send_message(aggreply,
1371 				     sizeof(struct clvm_header) +
1372 				     replylen, csid, fd,
1373 				     "Error sending command reply");
1374 		} else {
1375 			struct clvm_header head;
1376 
1377 			DEBUGLOG("Error attempting to realloc return buffer\n");
1378 			/* Return a failure response */
1379 			head.cmd = CLVMD_CMD_REPLY;
1380 			head.status = ENOMEM;
1381 			head.flags = 0;
1382 			head.clientid = msg->clientid;
1383 			head.arglen = 0;
1384 			head.node[0] = '\0';
1385 			send_message(&head, sizeof(struct clvm_header), csid,
1386 				     fd, "Error sending ENOMEM command reply");
1387 			return;
1388 		}
1389 	}
1390 
1391 	/* Free buffer if it was malloced */
1392 	if (msg_malloced) {
1393 		free(msg);
1394 	}
1395 	free(replyargs);
1396 }
1397 
1398 /* Add a reply to a command to the list of replies for this client.
1399    If we have got a full set then send them to the waiting client down the local
1400    socket */
add_reply_to_list(struct local_client * client,int status,const char * csid,const char * buf,int len)1401 static void add_reply_to_list(struct local_client *client, int status,
1402 			      const char *csid, const char *buf, int len)
1403 {
1404 	struct node_reply *reply;
1405 
1406 	pthread_mutex_lock(&client->bits.localsock.reply_mutex);
1407 
1408 	/* Add it to the list of replies */
1409 	reply = malloc(sizeof(struct node_reply));
1410 	if (reply) {
1411 		reply->status = status;
1412 		clops->name_from_csid(csid, reply->node);
1413 		DEBUGLOG("Reply from node %s: %d bytes\n", reply->node, len);
1414 
1415 		if (len > 0) {
1416 			reply->replymsg = malloc(len);
1417 			if (!reply->replymsg) {
1418 				reply->status = ENOMEM;
1419 			} else {
1420 				memcpy(reply->replymsg, buf, len);
1421 			}
1422 		} else {
1423 			reply->replymsg = NULL;
1424 		}
1425 		/* Hook it onto the reply chain */
1426 		reply->next = client->bits.localsock.replies;
1427 		client->bits.localsock.replies = reply;
1428 	} else {
1429 		/* It's all gone horribly wrong... */
1430 		pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
1431 		send_local_reply(client, ENOMEM, client->fd);
1432 		return;
1433 	}
1434 	DEBUGLOG("Got %d replies, expecting: %d\n",
1435 		 client->bits.localsock.num_replies + 1,
1436 		 client->bits.localsock.expected_replies);
1437 
1438 	/* If we have the whole lot then do the post-process */
1439 	if (++client->bits.localsock.num_replies ==
1440 	    client->bits.localsock.expected_replies) {
1441 		/* Post-process the command */
1442 		if (client->bits.localsock.threadid) {
1443 			pthread_mutex_lock(&client->bits.localsock.mutex);
1444 			client->bits.localsock.state = POST_COMMAND;
1445 			pthread_cond_signal(&client->bits.localsock.cond);
1446 			pthread_mutex_unlock(&client->bits.localsock.mutex);
1447 		}
1448 	}
1449 	pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
1450 }
1451 
1452 /* This is the thread that runs the PRE and post commands for a particular connection */
pre_and_post_thread(void * arg)1453 static __attribute__ ((noreturn)) void *pre_and_post_thread(void *arg)
1454 {
1455 	struct local_client *client = (struct local_client *) arg;
1456 	int status;
1457 	int write_status;
1458 	sigset_t ss;
1459 	int pipe_fd = client->bits.localsock.pipe;
1460 
1461 	DEBUGLOG("in sub thread: client = %p\n", client);
1462 
1463 	/* Don't start until the LVM thread is ready */
1464 	pthread_mutex_lock(&lvm_start_mutex);
1465 	pthread_mutex_unlock(&lvm_start_mutex);
1466 	DEBUGLOG("Sub thread ready for work.\n");
1467 
1468 	/* Ignore SIGUSR1 (handled by master process) but enable
1469 	   SIGUSR2 (kills subthreads) */
1470 	sigemptyset(&ss);
1471 	sigaddset(&ss, SIGUSR1);
1472 	pthread_sigmask(SIG_BLOCK, &ss, NULL);
1473 
1474 	sigdelset(&ss, SIGUSR1);
1475 	sigaddset(&ss, SIGUSR2);
1476 	pthread_sigmask(SIG_UNBLOCK, &ss, NULL);
1477 
1478 	/* Loop around doing PRE and POST functions until the client goes away */
1479 	while (!client->bits.localsock.finished) {
1480 		/* Execute the code */
1481 		status = do_pre_command(client);
1482 
1483 		if (status)
1484 			client->bits.localsock.all_success = 0;
1485 
1486 		DEBUGLOG("Writing status %d down pipe %d\n", status, pipe_fd);
1487 
1488 		/* Tell the parent process we have finished this bit */
1489 		do {
1490 			write_status = write(pipe_fd, &status, sizeof(int));
1491 			if (write_status == sizeof(int))
1492 				break;
1493 			if (write_status < 0 &&
1494 			    (errno == EINTR || errno == EAGAIN))
1495 				continue;
1496 			log_error("Error sending to pipe: %s\n", strerror(errno));
1497 			break;
1498 		} while(1);
1499 
1500 		if (status) {
1501 			client->bits.localsock.state = POST_COMMAND;
1502 			goto next_pre;
1503 		}
1504 
1505 		/* We may need to wait for the condition variable before running the post command */
1506 		pthread_mutex_lock(&client->bits.localsock.mutex);
1507 		DEBUGLOG("Waiting to do post command - state = %d\n",
1508 			 client->bits.localsock.state);
1509 
1510 		if (client->bits.localsock.state != POST_COMMAND) {
1511 			pthread_cond_wait(&client->bits.localsock.cond,
1512 					  &client->bits.localsock.mutex);
1513 		}
1514 		pthread_mutex_unlock(&client->bits.localsock.mutex);
1515 
1516 		DEBUGLOG("Got post command condition...\n");
1517 
1518 		/* POST function must always run, even if the client aborts */
1519 		status = 0;
1520 		do_post_command(client);
1521 
1522 		do {
1523 			write_status = write(pipe_fd, &status, sizeof(int));
1524 			if (write_status == sizeof(int))
1525 				break;
1526 			if (write_status < 0 &&
1527 			    (errno == EINTR || errno == EAGAIN))
1528 				continue;
1529 			log_error("Error sending to pipe: %s\n", strerror(errno));
1530 			break;
1531 		} while(1);
1532 next_pre:
1533 		DEBUGLOG("Waiting for next pre command\n");
1534 
1535 		pthread_mutex_lock(&client->bits.localsock.mutex);
1536 		if (client->bits.localsock.state != PRE_COMMAND &&
1537 		    !client->bits.localsock.finished) {
1538 			pthread_cond_wait(&client->bits.localsock.cond,
1539 					  &client->bits.localsock.mutex);
1540 		}
1541 		pthread_mutex_unlock(&client->bits.localsock.mutex);
1542 
1543 		DEBUGLOG("Got pre command condition...\n");
1544 	}
1545 	DEBUGLOG("Subthread finished\n");
1546 	pthread_exit((void *) 0);
1547 }
1548 
1549 /* Process a command on the local node and store the result */
process_local_command(struct clvm_header * msg,int msglen,struct local_client * client,unsigned short xid)1550 static int process_local_command(struct clvm_header *msg, int msglen,
1551 				 struct local_client *client,
1552 				 unsigned short xid)
1553 {
1554 	char *replybuf = malloc(max_cluster_message);
1555 	int buflen = max_cluster_message - sizeof(struct clvm_header) - 1;
1556 	int replylen = 0;
1557 	int status;
1558 
1559 	DEBUGLOG("process_local_command: %s msg=%p, msglen =%d, client=%p\n",
1560 		 decode_cmd(msg->cmd), msg, msglen, client);
1561 
1562 	if (replybuf == NULL)
1563 		return -1;
1564 
1565 	status = do_command(client, msg, msglen, &replybuf, buflen, &replylen);
1566 
1567 	if (status)
1568 		client->bits.localsock.all_success = 0;
1569 
1570 	/* If we took too long then discard the reply */
1571 	if (xid == client->xid) {
1572 		add_reply_to_list(client, status, our_csid, replybuf, replylen);
1573 	} else {
1574 		DEBUGLOG
1575 		    ("Local command took too long, discarding xid %d, current is %d\n",
1576 		     xid, client->xid);
1577 	}
1578 
1579 	free(replybuf);
1580 	return status;
1581 }
1582 
process_reply(const struct clvm_header * msg,int msglen,const char * csid)1583 static int process_reply(const struct clvm_header *msg, int msglen, const char *csid)
1584 {
1585 	struct local_client *client = NULL;
1586 
1587 	client = find_client(msg->clientid);
1588 	if (!client) {
1589 		DEBUGLOG("Got message for unknown client 0x%x\n",
1590 			 msg->clientid);
1591 		log_error("Got message for unknown client 0x%x\n",
1592 			  msg->clientid);
1593 		return -1;
1594 	}
1595 
1596 	if (msg->status)
1597 		client->bits.localsock.all_success = 0;
1598 
1599 	/* Gather replies together for this client id */
1600 	if (msg->xid == client->xid) {
1601 		add_reply_to_list(client, msg->status, csid, msg->args,
1602 				  msg->arglen);
1603 	} else {
1604 		DEBUGLOG("Discarding reply with old XID %d, current = %d\n",
1605 			 msg->xid, client->xid);
1606 	}
1607 	return 0;
1608 }
1609 
1610 /* Send an aggregated reply back to the client */
send_local_reply(struct local_client * client,int status,int fd)1611 static void send_local_reply(struct local_client *client, int status, int fd)
1612 {
1613 	struct clvm_header *clientreply;
1614 	struct node_reply *thisreply = client->bits.localsock.replies;
1615 	char *replybuf;
1616 	char *ptr;
1617 	int message_len = 0;
1618 
1619 	DEBUGLOG("Send local reply\n");
1620 
1621 	/* Work out the total size of the reply */
1622 	while (thisreply) {
1623 		if (thisreply->replymsg)
1624 			message_len += strlen(thisreply->replymsg) + 1;
1625 		else
1626 			message_len++;
1627 
1628 		message_len += strlen(thisreply->node) + 1 + sizeof(int);
1629 
1630 		thisreply = thisreply->next;
1631 	}
1632 
1633 	/* Add in the size of our header */
1634 	message_len = message_len + sizeof(struct clvm_header) + 1;
1635 	replybuf = malloc(message_len);
1636 
1637 	clientreply = (struct clvm_header *) replybuf;
1638 	clientreply->status = status;
1639 	clientreply->cmd = CLVMD_CMD_REPLY;
1640 	clientreply->node[0] = '\0';
1641 	clientreply->flags = 0;
1642 
1643 	ptr = clientreply->args;
1644 
1645 	/* Add in all the replies, and free them as we go */
1646 	thisreply = client->bits.localsock.replies;
1647 	while (thisreply) {
1648 		struct node_reply *tempreply = thisreply;
1649 
1650 		strcpy(ptr, thisreply->node);
1651 		ptr += strlen(thisreply->node) + 1;
1652 
1653 		if (thisreply->status)
1654 			clientreply->flags |= CLVMD_FLAG_NODEERRS;
1655 
1656 		memcpy(ptr, &thisreply->status, sizeof(int));
1657 		ptr += sizeof(int);
1658 
1659 		if (thisreply->replymsg) {
1660 			strcpy(ptr, thisreply->replymsg);
1661 			ptr += strlen(thisreply->replymsg) + 1;
1662 		} else {
1663 			ptr[0] = '\0';
1664 			ptr++;
1665 		}
1666 		thisreply = thisreply->next;
1667 
1668 		free(tempreply->replymsg);
1669 		free(tempreply);
1670 	}
1671 
1672 	/* Terminate with an empty node name */
1673 	*ptr = '\0';
1674 
1675 	clientreply->arglen = ptr - clientreply->args + 1;
1676 
1677 	/* And send it */
1678 	send_message(replybuf, message_len, our_csid, fd,
1679 		     "Error sending REPLY to client");
1680 	free(replybuf);
1681 
1682 	/* Reset comms variables */
1683 	client->bits.localsock.replies = NULL;
1684 	client->bits.localsock.expected_replies = 0;
1685 	client->bits.localsock.in_progress = FALSE;
1686 	client->bits.localsock.sent_out = FALSE;
1687 }
1688 
1689 /* Just free a reply chain baceuse it wasn't used. */
free_reply(struct local_client * client)1690 static void free_reply(struct local_client *client)
1691 {
1692 	/* Add in all the replies, and free them as we go */
1693 	struct node_reply *thisreply = client->bits.localsock.replies;
1694 	while (thisreply) {
1695 		struct node_reply *tempreply = thisreply;
1696 
1697 		thisreply = thisreply->next;
1698 
1699 		free(tempreply->replymsg);
1700 		free(tempreply);
1701 	}
1702 	client->bits.localsock.replies = NULL;
1703 }
1704 
1705 /* Send our version number to the cluster */
send_version_message()1706 static void send_version_message()
1707 {
1708 	char message[sizeof(struct clvm_header) + sizeof(int) * 3];
1709 	struct clvm_header *msg = (struct clvm_header *) message;
1710 	int version_nums[3];
1711 
1712 	msg->cmd = CLVMD_CMD_VERSION;
1713 	msg->status = 0;
1714 	msg->flags = 0;
1715 	msg->clientid = 0;
1716 	msg->arglen = sizeof(version_nums);
1717 
1718 	version_nums[0] = htonl(CLVMD_MAJOR_VERSION);
1719 	version_nums[1] = htonl(CLVMD_MINOR_VERSION);
1720 	version_nums[2] = htonl(CLVMD_PATCH_VERSION);
1721 
1722 	memcpy(&msg->args, version_nums, sizeof(version_nums));
1723 
1724 	hton_clvm(msg);
1725 
1726 	clops->cluster_send_message(message, sizeof(message), NULL,
1727 			     "Error Sending version number");
1728 }
1729 
1730 /* Send a message to either a local client or another server */
send_message(void * buf,int msglen,const char * csid,int fd,const char * errtext)1731 static int send_message(void *buf, int msglen, const char *csid, int fd,
1732 			const char *errtext)
1733 {
1734 	int len = 0;
1735 	int saved_errno = 0;
1736 	struct timespec delay;
1737 	struct timespec remtime;
1738 
1739 	int retry_cnt = 0;
1740 
1741 	/* Send remote messages down the cluster socket */
1742 	if (csid == NULL || !ISLOCAL_CSID(csid)) {
1743 		hton_clvm((struct clvm_header *) buf);
1744 		return clops->cluster_send_message(buf, msglen, csid, errtext);
1745 	} else {
1746 		int ptr = 0;
1747 
1748 		/* Make sure it all goes */
1749 		do {
1750 			if (retry_cnt > MAX_RETRIES)
1751 			{
1752 				errno = saved_errno;
1753 				log_error("%s", errtext);
1754 				errno = saved_errno;
1755 				break;
1756 			}
1757 
1758 			len = write(fd, buf + ptr, msglen - ptr);
1759 
1760 			if (len <= 0) {
1761 				if (errno == EINTR)
1762 					continue;
1763 				if (errno == EAGAIN ||
1764 				    errno == EIO ||
1765 				    errno == ENOSPC) {
1766 					saved_errno = errno;
1767 					retry_cnt++;
1768 
1769 					delay.tv_sec = 0;
1770 					delay.tv_nsec = 100000;
1771 					remtime.tv_sec = 0;
1772 					remtime.tv_nsec = 0;
1773 					(void) nanosleep (&delay, &remtime);
1774 
1775 					continue;
1776 				}
1777 				log_error("%s", errtext);
1778 				break;
1779 			}
1780 			ptr += len;
1781 		} while (ptr < msglen);
1782 	}
1783 	return len;
1784 }
1785 
process_work_item(struct lvm_thread_cmd * cmd)1786 static int process_work_item(struct lvm_thread_cmd *cmd)
1787 {
1788 	/* If msg is NULL then this is a cleanup request */
1789 	if (cmd->msg == NULL) {
1790 		DEBUGLOG("process_work_item: free fd %d\n", cmd->client->fd);
1791 		cmd_client_cleanup(cmd->client);
1792 		free(cmd->client);
1793 		return 0;
1794 	}
1795 
1796 	if (!cmd->remote) {
1797 		DEBUGLOG("process_work_item: local\n");
1798 		process_local_command(cmd->msg, cmd->msglen, cmd->client,
1799 				      cmd->xid);
1800 	} else {
1801 		DEBUGLOG("process_work_item: remote\n");
1802 		process_remote_command(cmd->msg, cmd->msglen, cmd->client->fd,
1803 				       cmd->csid);
1804 	}
1805 	return 0;
1806 }
1807 
1808 /*
1809  * Routine that runs in the "LVM thread".
1810  */
lvm_thread_fn(void * arg)1811 static void lvm_thread_fn(void *arg)
1812 {
1813 	struct dm_list *cmdl, *tmp;
1814 	sigset_t ss;
1815 	int using_gulm = (int)(long)arg;
1816 
1817 	DEBUGLOG("LVM thread function started\n");
1818 
1819 	/* Ignore SIGUSR1 & 2 */
1820 	sigemptyset(&ss);
1821 	sigaddset(&ss, SIGUSR1);
1822 	sigaddset(&ss, SIGUSR2);
1823 	pthread_sigmask(SIG_BLOCK, &ss, NULL);
1824 
1825 	/* Initialise the interface to liblvm */
1826 	init_lvm(using_gulm);
1827 
1828 	/* Allow others to get moving */
1829 	pthread_mutex_unlock(&lvm_start_mutex);
1830 
1831 	/* Now wait for some actual work */
1832 	for (;;) {
1833 		DEBUGLOG("LVM thread waiting for work\n");
1834 
1835 		pthread_mutex_lock(&lvm_thread_mutex);
1836 		if (dm_list_empty(&lvm_cmd_head))
1837 			pthread_cond_wait(&lvm_thread_cond, &lvm_thread_mutex);
1838 
1839 		dm_list_iterate_safe(cmdl, tmp, &lvm_cmd_head) {
1840 			struct lvm_thread_cmd *cmd;
1841 
1842 			cmd =
1843 			    dm_list_struct_base(cmdl, struct lvm_thread_cmd, list);
1844 			dm_list_del(&cmd->list);
1845 			pthread_mutex_unlock(&lvm_thread_mutex);
1846 
1847 			process_work_item(cmd);
1848 			free(cmd->msg);
1849 			free(cmd);
1850 
1851 			pthread_mutex_lock(&lvm_thread_mutex);
1852 		}
1853 		pthread_mutex_unlock(&lvm_thread_mutex);
1854 	}
1855 }
1856 
1857 /* Pass down some work to the LVM thread */
add_to_lvmqueue(struct local_client * client,struct clvm_header * msg,int msglen,const char * csid)1858 static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg,
1859 			   int msglen, const char *csid)
1860 {
1861 	struct lvm_thread_cmd *cmd;
1862 
1863 	cmd = malloc(sizeof(struct lvm_thread_cmd));
1864 	if (!cmd)
1865 		return ENOMEM;
1866 
1867 	if (msglen) {
1868 		cmd->msg = malloc(msglen);
1869 		if (!cmd->msg) {
1870 			log_error("Unable to allocate buffer space\n");
1871 			free(cmd);
1872 			return -1;
1873 		}
1874 		memcpy(cmd->msg, msg, msglen);
1875 	}
1876 	else {
1877 		cmd->msg = NULL;
1878 	}
1879 	cmd->client = client;
1880 	cmd->msglen = msglen;
1881 	cmd->xid = client->xid;
1882 
1883 	if (csid) {
1884 		memcpy(cmd->csid, csid, max_csid_len);
1885 		cmd->remote = 1;
1886 	} else {
1887 		cmd->remote = 0;
1888 	}
1889 
1890 	DEBUGLOG
1891 	    ("add_to_lvmqueue: cmd=%p. client=%p, msg=%p, len=%d, csid=%p, xid=%d\n",
1892 	     cmd, client, msg, msglen, csid, cmd->xid);
1893 	pthread_mutex_lock(&lvm_thread_mutex);
1894 	dm_list_add(&lvm_cmd_head, &cmd->list);
1895 	pthread_cond_signal(&lvm_thread_cond);
1896 	pthread_mutex_unlock(&lvm_thread_mutex);
1897 
1898 	return 0;
1899 }
1900 
1901 /* Return 0 if we can talk to an existing clvmd */
check_local_clvmd(void)1902 static int check_local_clvmd(void)
1903 {
1904 	int local_socket;
1905 	struct sockaddr_un sockaddr;
1906 	int ret = 0;
1907 
1908 	/* Open local socket */
1909 	if ((local_socket = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
1910 		return -1;
1911 	}
1912 
1913 	memset(&sockaddr, 0, sizeof(sockaddr));
1914 	memcpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(CLVMD_SOCKNAME));
1915 	sockaddr.sun_family = AF_UNIX;
1916 
1917 	if (connect(local_socket,(struct sockaddr *) &sockaddr,
1918 		    sizeof(sockaddr))) {
1919 		ret = -1;
1920 	}
1921 
1922 	close(local_socket);
1923 	return ret;
1924 }
1925 
1926 
1927 /* Open the local socket, that's the one we talk to libclvm down */
open_local_sock()1928 static int open_local_sock()
1929 {
1930 	int local_socket;
1931 	struct sockaddr_un sockaddr;
1932 
1933 	/* Open local socket */
1934 	if (CLVMD_SOCKNAME[0] != '\0')
1935 		unlink(CLVMD_SOCKNAME);
1936 	local_socket = socket(PF_UNIX, SOCK_STREAM, 0);
1937 	if (local_socket < 0) {
1938 		log_error("Can't create local socket: %s", strerror(errno));
1939 		return -1;
1940 	}
1941 	/* Set Close-on-exec & non-blocking */
1942 	fcntl(local_socket, F_SETFD, 1);
1943 	fcntl(local_socket, F_SETFL, fcntl(local_socket, F_GETFL, 0) | O_NONBLOCK);
1944 
1945 	memset(&sockaddr, 0, sizeof(sockaddr));
1946 	memcpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(CLVMD_SOCKNAME));
1947 	sockaddr.sun_family = AF_UNIX;
1948 	if (bind(local_socket, (struct sockaddr *) &sockaddr, sizeof(sockaddr))) {
1949 		log_error("can't bind local socket: %s", strerror(errno));
1950 		close(local_socket);
1951 		return -1;
1952 	}
1953 	if (listen(local_socket, 1) != 0) {
1954 		log_error("listen local: %s", strerror(errno));
1955 		close(local_socket);
1956 		return -1;
1957 	}
1958 	if (CLVMD_SOCKNAME[0] != '\0')
1959 		chmod(CLVMD_SOCKNAME, 0600);
1960 
1961 	return local_socket;
1962 }
1963 
process_message(struct local_client * client,const char * buf,int len,const char * csid)1964 void process_message(struct local_client *client, const char *buf, int len,
1965 		     const char *csid)
1966 {
1967 	struct clvm_header *inheader;
1968 
1969 	inheader = (struct clvm_header *) buf;
1970 	ntoh_clvm(inheader);	/* Byteswap fields */
1971 	if (inheader->cmd == CLVMD_CMD_REPLY)
1972 		process_reply(inheader, len, csid);
1973 	else
1974 		add_to_lvmqueue(client, inheader, len, csid);
1975 }
1976 
1977 
check_all_callback(struct local_client * client,const char * csid,int node_up)1978 static void check_all_callback(struct local_client *client, const char *csid,
1979 			       int node_up)
1980 {
1981 	if (!node_up)
1982 		add_reply_to_list(client, EHOSTDOWN, csid, "CLVMD not running",
1983 				  18);
1984 }
1985 
1986 /* Check to see if all CLVMDs are running (ie one on
1987    every node in the cluster).
1988    If not, returns -1 and prints out a list of errant nodes */
check_all_clvmds_running(struct local_client * client)1989 static int check_all_clvmds_running(struct local_client *client)
1990 {
1991 	DEBUGLOG("check_all_clvmds_running\n");
1992 	return clops->cluster_do_node_callback(client, check_all_callback);
1993 }
1994 
1995 /* Return a local_client struct given a client ID.
1996    client IDs are in network byte order */
find_client(int clientid)1997 static struct local_client *find_client(int clientid)
1998 {
1999 	struct local_client *thisfd;
2000 	for (thisfd = &local_client_head; thisfd != NULL; thisfd = thisfd->next) {
2001 		if (thisfd->fd == ntohl(clientid))
2002 			return thisfd;
2003 	}
2004 	return NULL;
2005 }
2006 
2007 /* Byte-swapping routines for the header so we
2008    work in a heterogeneous environment */
hton_clvm(struct clvm_header * hdr)2009 static void hton_clvm(struct clvm_header *hdr)
2010 {
2011 	hdr->status = htonl(hdr->status);
2012 	hdr->arglen = htonl(hdr->arglen);
2013 	hdr->xid = htons(hdr->xid);
2014 	/* Don't swap clientid as it's only a token as far as
2015 	   remote nodes are concerned */
2016 }
2017 
ntoh_clvm(struct clvm_header * hdr)2018 static void ntoh_clvm(struct clvm_header *hdr)
2019 {
2020 	hdr->status = ntohl(hdr->status);
2021 	hdr->arglen = ntohl(hdr->arglen);
2022 	hdr->xid = ntohs(hdr->xid);
2023 }
2024 
2025 /* Handler for SIGUSR2 - sent to kill subthreads */
sigusr2_handler(int sig)2026 static void sigusr2_handler(int sig)
2027 {
2028 	DEBUGLOG("SIGUSR2 received\n");
2029 	return;
2030 }
2031 
sigterm_handler(int sig)2032 static void sigterm_handler(int sig)
2033 {
2034 	DEBUGLOG("SIGTERM received\n");
2035 	quit = 1;
2036 	return;
2037 }
2038 
sighup_handler(int sig)2039 static void sighup_handler(int sig)
2040 {
2041         DEBUGLOG("got SIGHUP\n");
2042 	reread_config = 1;
2043 }
2044 
sync_lock(const char * resource,int mode,int flags,int * lockid)2045 int sync_lock(const char *resource, int mode, int flags, int *lockid)
2046 {
2047 	return clops->sync_lock(resource, mode, flags, lockid);
2048 }
2049 
sync_unlock(const char * resource,int lockid)2050 int sync_unlock(const char *resource, int lockid)
2051 {
2052 	return clops->sync_unlock(resource, lockid);
2053 }
2054 
parse_cluster_interface(char * ifname)2055 static if_type_t parse_cluster_interface(char *ifname)
2056 {
2057 	if_type_t iface = IF_AUTO;
2058 
2059 	if (!strcmp(ifname, "auto"))
2060 		iface = IF_AUTO;
2061 	if (!strcmp(ifname, "cman"))
2062 		iface = IF_CMAN;
2063 	if (!strcmp(ifname, "gulm"))
2064 		iface = IF_GULM;
2065 	if (!strcmp(ifname, "openais"))
2066 		iface = IF_OPENAIS;
2067 	if (!strcmp(ifname, "corosync"))
2068 		iface = IF_COROSYNC;
2069 
2070 	return iface;
2071 }
2072 
2073 /*
2074  * Try and find a cluster system in corosync's objdb, if it is running. This is
2075  * only called if the command-line option is not present, and if it fails
2076  * we still try the interfaces in order.
2077  */
get_cluster_type()2078 static if_type_t get_cluster_type()
2079 {
2080 #ifdef HAVE_COROSYNC_CONFDB_H
2081 	confdb_handle_t handle;
2082 	if_type_t type = IF_AUTO;
2083 	int result;
2084 	char buf[255];
2085 	size_t namelen = sizeof(buf);
2086 	hdb_handle_t cluster_handle;
2087 	hdb_handle_t clvmd_handle;
2088 	confdb_callbacks_t callbacks = {
2089 		.confdb_key_change_notify_fn = NULL,
2090 		.confdb_object_create_change_notify_fn = NULL,
2091 		.confdb_object_delete_change_notify_fn = NULL
2092 	};
2093 
2094 	result = confdb_initialize (&handle, &callbacks);
2095         if (result != CS_OK)
2096 		return type;
2097 
2098         result = confdb_object_find_start(handle, OBJECT_PARENT_HANDLE);
2099 	if (result != CS_OK)
2100 		goto out;
2101 
2102         result = confdb_object_find(handle, OBJECT_PARENT_HANDLE, (void *)"cluster", strlen("cluster"), &cluster_handle);
2103         if (result != CS_OK)
2104 		goto out;
2105 
2106         result = confdb_object_find_start(handle, cluster_handle);
2107 	if (result != CS_OK)
2108 		goto out;
2109 
2110         result = confdb_object_find(handle, cluster_handle, (void *)"clvmd", strlen("clvmd"), &clvmd_handle);
2111         if (result != CS_OK)
2112 		goto out;
2113 
2114         result = confdb_key_get(handle, clvmd_handle, (void *)"interface", strlen("interface"), buf, &namelen);
2115         if (result != CS_OK)
2116 		goto out;
2117 
2118 	buf[namelen] = '\0';
2119 	type = parse_cluster_interface(buf);
2120 	DEBUGLOG("got interface type '%s' from confdb\n", buf);
2121 out:
2122 	confdb_finalize(handle);
2123 	return type;
2124 #else
2125 	return IF_AUTO;
2126 #endif
2127 }
2128