xref: /dragonfly/sbin/hammer2/cmd_service.c (revision e6d22e9b)
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35 
36 #include "hammer2.h"
37 
38 #include <sys/xdiskioctl.h>
39 #include <machine/atomic.h>
40 
41 struct hammer2_media_config {
42 	hammer2_volconf_t	copy_run;
43 	hammer2_volconf_t	copy_pend;
44 	pthread_t		thread;
45 	int			thread_started;
46 	pthread_cond_t		cond;
47 	int			ctl;
48 	int			fd;
49 	int			pipefd[2];      /* signal stop */
50 	dmsg_iocom_t		iocom;
51 	pthread_t		iocom_thread;
52 	enum { H2MC_STOPPED, H2MC_CONNECT, H2MC_RUNNING } state;
53 };
54 
55 typedef struct hammer2_media_config hammer2_media_config_t;
56 
57 #define H2CONFCTL_STOP          0x00000001
58 #define H2CONFCTL_UPDATE        0x00000002
59 
60 struct diskcon {
61 	TAILQ_ENTRY(diskcon) entry;
62 	char	*disk;
63 };
64 
65 struct service_node_opaque {
66 	char	cl_label[64];
67 	char	fs_label[64];
68 	dmsg_media_block_t block;
69 	int	attached;
70 	int	servicing;
71 	int	servicefd;
72 };
73 
74 struct autoconn {
75 	TAILQ_ENTRY(autoconn) entry;
76 	char	*host;
77 	int	stage;
78 	int	stopme;
79 	int	pipefd[2];	/* {read,write} */
80 	enum { AUTOCONN_INACTIVE, AUTOCONN_ACTIVE } state;
81 	pthread_t thread;
82 };
83 
84 #define WS " \r\n"
85 
86 static TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq);
87 static pthread_mutex_t diskmtx;
88 static pthread_mutex_t confmtx;
89 
90 static void *service_thread(void *data);
91 static void *udev_thread(void *data);
92 static void *autoconn_thread(void *data);
93 static void master_reconnect(const char *mntpt);
94 static void disk_reconnect(const char *disk);
95 static void disk_disconnect(void *handle);
96 static void udev_check_disks(void);
97 static void hammer2_usrmsg_handler(dmsg_msg_t *msg, int unmanaged);
98 static void *hammer2_volconf_thread(void *info);
99 static void hammer2_volconf_signal(dmsg_iocom_t *iocom);
100 static void hammer2_volconf_start(hammer2_media_config_t *conf,
101 			const char *hostname);
102 static void hammer2_volconf_stop(hammer2_media_config_t *conf);
103 
104 
105 static void xdisk_connect(void);
106 
107 /*
108  * Start-up the master listener daemon for the machine.  This daemon runs
109  * a UDP discovery protocol, a TCP rendezvous, and scans certain files
110  * and directories for work.
111  *
112  * --
113  *
114  * The only purpose for the UDP discovery protocol is to determine what
115  * other IPs on the LAN are running the hammer2 service daemon.  DNS is not
116  * required to operate, but hostnames (if assigned) must be unique.  If
117  * no hostname is assigned the host's IP is used as the name.  This name
118  * is broadcast along with the mtime of the originator's private key.
119  *
120  * Receiving hammer2 service daemons which are able to match the label against
121  * /etc/hammer2/remote/<label>.pub will initiate a persistent connection
122  * to the target.  Removal of the file will cause a disconnection.  A failed
123  * public key negotiation stops further connection attempts until either the
124  * file is updated or the remote mtime is updated.
125  *
126  * Generally speaking this results in a web of connections, typically a
127  * combination of point-to-point for the more important links and relayed
128  * (spanning tree) for less important or filtered links.
129  *
130  * --
131  *
132  * The TCP listener serves as a rendezvous point in the cluster, accepting
133  * connections, performing registrations and authentications, maintaining
134  * the spanning tree, and keeping track of message state so disconnects can
135  * be handled properly.
136  *
137  * Once authenticated only low-level messaging protocols (which includes
138  * tracking persistent messages) are handled by this daemon.  This daemon
139  * does not run the higher level quorum or locking protocols.
140  *
141  * --
142  *
143  * The file /etc/hammer2/autoconn, if it exists, contains a list of targets
144  * to connect to (which do not have to be on the local lan).  This list will
145  * be retried until a connection can be established.  The file is not usually
146  * needed for linkages local to the LAN.
147  */
148 int
149 cmd_service(void)
150 {
151 	struct sockaddr_in lsin;
152 	int on;
153 	int lfd;
154 
155 	/*
156 	 * Acquire socket and set options
157 	 */
158 	if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
159 		fprintf(stderr, "master_listen: socket(): %s\n",
160 			strerror(errno));
161 		return 1;
162 	}
163 	on = 1;
164 	setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
165 
166 	/*
167 	 * Setup listen port and try to bind.  If the bind fails we assume
168 	 * that a master listener process is already running and silently
169 	 * fail.
170 	 */
171 	bzero(&lsin, sizeof(lsin));
172 	lsin.sin_family = AF_INET;
173 	lsin.sin_addr.s_addr = INADDR_ANY;
174 	lsin.sin_port = htons(DMSG_LISTEN_PORT);
175 	if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) {
176 		close(lfd);
177 		if (QuietOpt == 0) {
178 			fprintf(stderr,
179 				"master listen: daemon already running\n");
180 		}
181 		return 0;
182 	}
183 	if (QuietOpt == 0)
184 		fprintf(stderr, "master listen: startup\n");
185 	listen(lfd, 50);
186 
187 	/*
188 	 * Fork and disconnect the controlling terminal and parent process,
189 	 * executing the specified function as a pthread.
190 	 *
191 	 * Returns to the original process which can then continue running.
192 	 * In debug mode this call will create the pthread without forking
193 	 * and set NormalExit to 0, instead of fork.
194 	 */
195 	hammer2_demon(service_thread, (void *)(intptr_t)lfd);
196 	if (NormalExit)
197 		close(lfd);
198 	return 0;
199 }
200 
201 /*
202  * Master listen/accept thread.  Accept connections on the master socket,
203  * starting a pthread for each one.
204  */
205 static
206 void *
207 service_thread(void *data)
208 {
209 	struct sockaddr_in asin;
210 	socklen_t alen;
211 	pthread_t thread;
212 	dmsg_master_service_info_t *info;
213 	int lfd = (int)(intptr_t)data;
214 	int fd;
215 	int i;
216 	int count;
217 	int opt;
218 	struct statfs *mntbuf = NULL;
219 	struct statvfs *mntvbuf = NULL;
220 
221 	/*
222 	 * Nobody waits for us
223 	 */
224 	setproctitle("hammer2 master listen");
225 	pthread_detach(pthread_self());
226 
227 	/*
228 	 * Start up a thread to handle block device monitoring for
229 	 * export to the cluster.
230 	 */
231 	pthread_create(&thread, NULL, udev_thread, NULL);
232 
233 	/*
234 	 * Start up a thread to tie /dev/xdisk into the cluster
235 	 * controller.
236 	 */
237 	xdisk_connect();
238 
239 	/*
240 	 * Start thread to manage /etc/hammer2/autoconn
241 	 */
242 	pthread_create(&thread, NULL, autoconn_thread, NULL);
243 
244 	/*
245 	 * Scan existing hammer2 mounts and reconnect to them using
246 	 * HAMMER2IOC_RECLUSTER.
247 	 */
248 	count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT);
249 	for (i = 0; i < count; ++i) {
250 		if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0)
251 			master_reconnect(mntbuf[i].f_mntonname);
252 	}
253 
254 	/*
255 	 * Accept connections and create pthreads to handle them after
256 	 * validating the IP.
257 	 */
258 	for (;;) {
259 		alen = sizeof(asin);
260 		fd = accept(lfd, (struct sockaddr *)&asin, &alen);
261 		if (fd < 0) {
262 			if (errno == EINTR)
263 				continue;
264 			break;
265 		}
266 		opt = 1;
267 		setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof opt);
268 		fprintf(stderr, "service_thread: accept fd %d\n", fd);
269 		info = malloc(sizeof(*info));
270 		bzero(info, sizeof(*info));
271 		info->fd = fd;
272 		info->detachme = 1;
273 		info->usrmsg_callback = hammer2_usrmsg_handler;
274 		info->label = strdup("client");
275 		pthread_create(&thread, NULL, dmsg_master_service, info);
276 	}
277 	return (NULL);
278 }
279 
280 /*
281  * Handle/Monitor the dmsg stream.  If unmanaged is set we are responsible
282  * for responding for the message, otherwise if it is not set libdmsg has
283  * already done some preprocessing and will respond to the message for us
284  * when we return.
285  *
286  * We primarily monitor for VOLCONFs
287  */
288 static
289 void
290 hammer2_usrmsg_handler(dmsg_msg_t *msg, int unmanaged)
291 {
292 	dmsg_state_t *state;
293 	hammer2_media_config_t *conf;
294 	dmsg_lnk_hammer2_volconf_t *msgconf;
295 	int i;
296 
297 	/*
298 	 * Only process messages which are part of a LNK_CONN stream
299 	 */
300 	state = msg->state;
301 	if (state == NULL ||
302 	    (state->rxcmd & DMSGF_BASECMDMASK) != DMSG_LNK_CONN) {
303 		hammer2_shell_parse(msg, unmanaged);
304 		return;
305 	}
306 
307 	switch(msg->tcmd) {
308 	case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE:
309 	case DMSG_LNK_CONN | DMSGF_DELETE:
310 	case DMSG_LNK_ERROR | DMSGF_DELETE:
311 		/*
312 		 * Deleting connection, clean out all volume configs
313 		 */
314 		if (state->media == NULL || state->media->usrhandle == NULL)
315 			break;
316 		conf = state->media->usrhandle;
317 		fprintf(stderr, "Shutting down media spans\n");
318 		for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) {
319 			if (conf[i].thread) {
320 				conf[i].ctl = H2CONFCTL_STOP;
321 				pthread_cond_signal(&conf[i].cond);
322 			}
323 		}
324 		for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) {
325 			if (conf[i].thread) {
326 				pthread_join(conf[i].thread, NULL);
327 				conf->thread_started = 0;
328 				pthread_cond_destroy(&conf[i].cond);
329 			}
330 		}
331 		state->media->usrhandle = NULL;
332 		free(conf);
333 		break;
334 	case DMSG_LNK_HAMMER2_VOLCONF:
335 		/*
336 		 * One-way volume-configuration message is transmitted
337 		 * over the open LNK_CONN transaction.
338 		 */
339 		fprintf(stderr, "RECEIVED VOLCONF\n");
340 
341 		if ((conf = state->media->usrhandle) == NULL) {
342 			conf = malloc(sizeof(*conf) * HAMMER2_COPYID_COUNT);
343 			bzero(conf, sizeof(*conf) * HAMMER2_COPYID_COUNT);
344 			state->media->usrhandle = conf;
345 		}
346 		msgconf = H2_LNK_VOLCONF(msg);
347 
348 		if (msgconf->index < 0 ||
349 		    msgconf->index >= HAMMER2_COPYID_COUNT) {
350 			fprintf(stderr,
351 				"VOLCONF: ILLEGAL INDEX %d\n",
352 				msgconf->index);
353 			break;
354 		}
355 		if (msgconf->copy.path[sizeof(msgconf->copy.path) - 1] != 0 ||
356 		    msgconf->copy.path[0] == 0) {
357 			fprintf(stderr,
358 				"VOLCONF: ILLEGAL PATH %d\n",
359 				msgconf->index);
360 			break;
361 		}
362 		conf += msgconf->index;
363 		pthread_mutex_lock(&confmtx);
364 		conf->copy_pend = msgconf->copy;
365 		conf->ctl |= H2CONFCTL_UPDATE;
366 		pthread_mutex_unlock(&confmtx);
367 		if (conf->thread_started == 0) {
368 			fprintf(stderr, "VOLCONF THREAD STARTED\n");
369 			pthread_cond_init(&conf->cond, NULL);
370 			pthread_create(&conf->thread, NULL,
371 				       hammer2_volconf_thread, (void *)conf);
372 			conf->thread_started = 1;
373 		}
374 		pthread_cond_signal(&conf->cond);
375 		break;
376 	default:
377 		if (unmanaged)
378 			dmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
379 		break;
380 	}
381 }
382 
383 static void *
384 hammer2_volconf_thread(void *info)
385 {
386 	hammer2_media_config_t *conf = info;
387 
388 	setproctitle("hammer2 volconf");
389 
390 	pthread_mutex_lock(&confmtx);
391 	while ((conf->ctl & H2CONFCTL_STOP) == 0) {
392 		if (conf->ctl & H2CONFCTL_UPDATE) {
393 			fprintf(stderr, "VOLCONF UPDATE\n");
394 			conf->ctl &= ~H2CONFCTL_UPDATE;
395 			if (bcmp(&conf->copy_run, &conf->copy_pend,
396 				 sizeof(conf->copy_run)) == 0) {
397 				fprintf(stderr, "VOLCONF: no changes\n");
398 				continue;
399 			}
400 			/*
401 			 * XXX TODO - auto reconnect on lookup failure or
402 			 *		connect failure or stream failure.
403 			 */
404 
405 			pthread_mutex_unlock(&confmtx);
406 			hammer2_volconf_stop(conf);
407 			conf->copy_run = conf->copy_pend;
408 			if (conf->copy_run.copyid != 0 &&
409 			    strncmp((char*)conf->copy_run.path, "span:", 5) == 0) {
410 				hammer2_volconf_start(conf,
411 						      (char*)conf->copy_run.path + 5);
412 			}
413 			pthread_mutex_lock(&confmtx);
414 			fprintf(stderr, "VOLCONF UPDATE DONE state %d\n", conf->state);
415 		}
416 		if (conf->state == H2MC_CONNECT) {
417 			hammer2_volconf_start(conf, (char*)conf->copy_run.path + 5);
418 			pthread_mutex_unlock(&confmtx);
419 			sleep(5);
420 			pthread_mutex_lock(&confmtx);
421 		} else {
422 			pthread_cond_wait(&conf->cond, &confmtx);
423 		}
424 	}
425 	pthread_mutex_unlock(&confmtx);
426 	hammer2_volconf_stop(conf);
427 	return(NULL);
428 }
429 
430 static
431 void
432 hammer2_volconf_start(hammer2_media_config_t *conf, const char *hostname)
433 {
434 	dmsg_master_service_info_t *info;
435 
436 	switch(conf->state) {
437 	case H2MC_STOPPED:
438 	case H2MC_CONNECT:
439 		conf->fd = dmsg_connect(hostname);
440 		if (conf->fd < 0) {
441 			fprintf(stderr, "Unable to connect to %s\n", hostname);
442 			conf->state = H2MC_CONNECT;
443 		} else if (pipe(conf->pipefd) < 0) {
444 			close(conf->fd);
445 			fprintf(stderr, "pipe() failed during volconf\n");
446 			conf->state = H2MC_CONNECT;
447 		} else {
448 			fprintf(stderr, "VOLCONF CONNECT\n");
449 			info = malloc(sizeof(*info));
450 			bzero(info, sizeof(*info));
451 			info->fd = conf->fd;
452 			info->altfd = conf->pipefd[0];
453 			info->altmsg_callback = hammer2_volconf_signal;
454 			info->usrmsg_callback = hammer2_usrmsg_handler;
455 			info->detachme = 0;
456 			conf->state = H2MC_RUNNING;
457 			pthread_create(&conf->iocom_thread, NULL,
458 				       dmsg_master_service, info);
459 		}
460 		break;
461 	case H2MC_RUNNING:
462 		break;
463 	}
464 }
465 
466 static
467 void
468 hammer2_volconf_stop(hammer2_media_config_t *conf)
469 {
470 	switch(conf->state) {
471 	case H2MC_STOPPED:
472 		break;
473 	case H2MC_CONNECT:
474 		conf->state = H2MC_STOPPED;
475 		break;
476 	case H2MC_RUNNING:
477 		close(conf->pipefd[1]);
478 		conf->pipefd[1] = -1;
479 		pthread_join(conf->iocom_thread, NULL);
480 		conf->state = H2MC_STOPPED;
481 		break;
482 	}
483 }
484 
485 static
486 void
487 hammer2_volconf_signal(dmsg_iocom_t *iocom)
488 {
489 	atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
490 }
491 
492 /*
493  * Monitor block devices.  Currently polls every ~10 seconds or so.
494  */
495 static
496 void *
497 udev_thread(void *data __unused)
498 {
499 	int	fd;
500 	int	seq = 0;
501 
502 	pthread_detach(pthread_self());
503 	setproctitle("hammer2 udev_thread");
504 
505 	if ((fd = open(UDEV_DEVICE_PATH, O_RDWR)) < 0) {
506 		fprintf(stderr, "udev_thread: unable to open \"%s\"\n",
507 			UDEV_DEVICE_PATH);
508 		pthread_exit(NULL);
509 	}
510 	udev_check_disks();
511 	while (ioctl(fd, UDEVWAIT, &seq) == 0) {
512 		udev_check_disks();
513 		sleep(1);
514 	}
515 	return (NULL);
516 }
517 
518 static void *autoconn_connect_thread(void *data);
519 static void autoconn_disconnect_signal(dmsg_iocom_t *iocom);
520 
521 static
522 void *
523 autoconn_thread(void *data __unused)
524 {
525 	TAILQ_HEAD(, autoconn) autolist;
526 	struct autoconn *ac;
527 	struct autoconn *next;
528 	pthread_t thread;
529 	struct stat st;
530 	time_t	t;
531 	time_t	lmod;
532 	int	found_last;
533 	FILE	*fp;
534 	char	buf[256];
535 
536 	TAILQ_INIT(&autolist);
537 	found_last = 0;
538 	lmod = 0;
539 
540 	pthread_detach(pthread_self());
541 	setproctitle("hammer2 autoconn_thread");
542 	for (;;) {
543 		/*
544 		 * Polling interval
545 		 */
546 		sleep(5);
547 
548 		/*
549 		 * Poll the file.  Loop up if the synchronized state (lmod)
550 		 * has not changed.
551 		 */
552 		if (stat(HAMMER2_DEFAULT_DIR "/autoconn", &st) == 0) {
553 			if (lmod == st.st_mtime)
554 				continue;
555 			fp = fopen(HAMMER2_DEFAULT_DIR "/autoconn", "r");
556 			if (fp == NULL)
557 				continue;
558 		} else {
559 			if (lmod == 0)
560 				continue;
561 			fp = NULL;
562 		}
563 
564 		/*
565 		 * Wait at least 5 seconds after the file is created or
566 		 * removed.
567 		 *
568 		 * Do not update the synchronized state.
569 		 */
570 		if (fp == NULL && found_last) {
571 			found_last = 0;
572 			continue;
573 		} else if (fp && found_last == 0) {
574 			fclose(fp);
575 			found_last = 1;
576 			continue;
577 		}
578 
579 		/*
580 		 * Don't scan the file until the time progresses past the
581 		 * file's mtime, so we can validate that the file was not
582 		 * further modified during our scan.
583 		 *
584 		 * Do not update the synchronized state.
585 		 */
586 		time(&t);
587 		if (fp) {
588 			if (t == st.st_mtime) {
589 				fclose(fp);
590 				continue;
591 			}
592 			t = st.st_mtime;
593 		} else {
594 			t = 0;
595 		}
596 
597 		/*
598 		 * Set staging to disconnect, then scan the file.
599 		 */
600 		TAILQ_FOREACH(ac, &autolist, entry)
601 			ac->stage = 0;
602 		while (fp && fgets(buf, sizeof(buf), fp) != NULL) {
603 			char *host;
604 
605 			if ((host = strtok(buf, " \t\r\n")) == NULL ||
606 			    host[0] == '#') {
607 				continue;
608 			}
609 			TAILQ_FOREACH(ac, &autolist, entry) {
610 				if (strcmp(host, ac->host) == 0)
611 					break;
612 			}
613 			if (ac == NULL) {
614 				ac = malloc(sizeof(*ac));
615 				bzero(ac, sizeof(*ac));
616 				ac->host = strdup(host);
617 				ac->state = AUTOCONN_INACTIVE;
618 				TAILQ_INSERT_TAIL(&autolist, ac, entry);
619 			}
620 			ac->stage = 1;
621 		}
622 
623 		/*
624 		 * Ignore the scan (and retry again) if the file was
625 		 * modified during the scan.
626 		 *
627 		 * Do not update the synchronized state.
628 		 */
629 		if (fp) {
630 			if (fstat(fileno(fp), &st) < 0) {
631 				fclose(fp);
632 				continue;
633 			}
634 			fclose(fp);
635 			if (t != st.st_mtime)
636 				continue;
637 		}
638 
639 		/*
640 		 * Update the synchronized state and reconfigure the
641 		 * connect list as needed.
642 		 */
643 		lmod = t;
644 		next = TAILQ_FIRST(&autolist);
645 		while ((ac = next) != NULL) {
646 			next = TAILQ_NEXT(ac, entry);
647 
648 			/*
649 			 * Staging, initiate
650 			 */
651 			if (ac->stage && ac->state == AUTOCONN_INACTIVE) {
652 				if (pipe(ac->pipefd) == 0) {
653 					ac->stopme = 0;
654 					ac->state = AUTOCONN_ACTIVE;
655 					pthread_create(&thread, NULL,
656 						       autoconn_connect_thread,
657 						       ac);
658 				}
659 			}
660 
661 			/*
662 			 * Unstaging, stop active connection.
663 			 *
664 			 * We write to the pipe which causes the iocom_core
665 			 * to call autoconn_disconnect_signal().
666 			 */
667 			if (ac->stage == 0 &&
668 			    ac->state == AUTOCONN_ACTIVE) {
669 				if (ac->stopme == 0) {
670 					char dummy = 0;
671 					ac->stopme = 1;
672 					write(ac->pipefd[1], &dummy, 1);
673 				}
674 			}
675 
676 			/*
677 			 * Unstaging, delete inactive connection.
678 			 */
679 			if (ac->stage == 0 &&
680 			    ac->state == AUTOCONN_INACTIVE) {
681 				TAILQ_REMOVE(&autolist, ac, entry);
682 				free(ac->host);
683 				free(ac);
684 				continue;
685 			}
686 		}
687 		sleep(5);
688 	}
689 	return(NULL);
690 }
691 
692 static
693 void *
694 autoconn_connect_thread(void *data)
695 {
696 	dmsg_master_service_info_t *info;
697 	struct autoconn *ac;
698 	void *res;
699 	int fd;
700 
701 	ac = data;
702 	pthread_detach(pthread_self());
703 	setproctitle("hammer2 dmsg");
704 
705 	while (ac->stopme == 0) {
706 		fd = dmsg_connect(ac->host);
707 		if (fd < 0) {
708 			if (DMsgDebugOpt > 2) {
709 				fprintf(stderr,
710 					"autoconn: Connect failure: %s\n",
711 					ac->host);
712 			}
713 			sleep(5);
714 			continue;
715 		}
716 		fprintf(stderr, "autoconn: Connect %s\n", ac->host);
717 
718 		info = malloc(sizeof(*info));
719 		bzero(info, sizeof(*info));
720 		info->fd = fd;
721 		info->altfd = ac->pipefd[0];
722 		info->altmsg_callback = autoconn_disconnect_signal;
723 		info->usrmsg_callback = hammer2_usrmsg_handler;
724 		info->detachme = 0;
725 		info->noclosealt = 1;
726 		pthread_create(&ac->thread, NULL, dmsg_master_service, info);
727 		pthread_join(ac->thread, &res);
728 	}
729 	close(ac->pipefd[0]);
730 	ac->state = AUTOCONN_INACTIVE;
731 	/* auto structure can be ripped out here */
732 	return(NULL);
733 }
734 
735 static
736 void
737 autoconn_disconnect_signal(dmsg_iocom_t *iocom)
738 {
739 	fprintf(stderr, "autoconn: Shutting down socket\n");
740 	atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
741 }
742 
743 /*
744  * Retrieve the list of disk attachments and attempt to export
745  * them.
746  */
747 static
748 void
749 udev_check_disks(void)
750 {
751 	char tmpbuf[1024];
752 	char *buf = NULL;
753 	char *disk;
754 	int error;
755 	size_t n;
756 
757 	for (;;) {
758 		n = 0;
759 		error = sysctlbyname("kern.disks", NULL, &n, NULL, 0);
760 		if (error < 0 || n == 0)
761 			break;
762 		if (n >= sizeof(tmpbuf))
763 			buf = malloc(n + 1);
764 		else
765 			buf = tmpbuf;
766 		error = sysctlbyname("kern.disks", buf, &n, NULL, 0);
767 		if (error == 0) {
768 			buf[n] = 0;
769 			break;
770 		}
771 		if (buf != tmpbuf) {
772 			free(buf);
773 			buf = NULL;
774 		}
775 		if (errno != ENOMEM)
776 			break;
777 	}
778 	if (buf) {
779 		fprintf(stderr, "DISKS: %s\n", buf);
780 		for (disk = strtok(buf, WS); disk; disk = strtok(NULL, WS)) {
781 			disk_reconnect(disk);
782 		}
783 		if (buf != tmpbuf)
784 			free(buf);
785 	}
786 }
787 
788 /*
789  * Normally the mount program supplies a cluster communications
790  * descriptor to the hammer2 vfs on mount, but if you kill the service
791  * daemon and restart it that link will be lost.
792  *
793  * This procedure attempts to [re]connect to existing mounts when
794  * the service daemon is started up before going into its accept
795  * loop.
796  *
797  * NOTE: A hammer2 mount point can only accomodate one connection at a time
798  *	 so this will disconnect any existing connection during the
799  *	 reconnect.
800  */
801 static
802 void
803 master_reconnect(const char *mntpt)
804 {
805 	struct hammer2_ioc_recluster recls;
806 	dmsg_master_service_info_t *info;
807 	pthread_t thread;
808 	int fd;
809 	int pipefds[2];
810 
811 	fd = open(mntpt, O_RDONLY);
812 	if (fd < 0) {
813 		fprintf(stderr, "reconnect %s: no access to mount\n", mntpt);
814 		return;
815 	}
816 	if (pipe(pipefds) < 0) {
817 		fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt);
818 		close(fd);
819 		return;
820 	}
821 	bzero(&recls, sizeof(recls));
822 	recls.fd = pipefds[0];
823 	if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) {
824 		fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt);
825 		close(pipefds[0]);
826 		close(pipefds[1]);
827 		close(fd);
828 		return;
829 	}
830 	close(pipefds[0]);
831 	close(fd);
832 
833 	info = malloc(sizeof(*info));
834 	bzero(info, sizeof(*info));
835 	info->fd = pipefds[1];
836 	info->detachme = 1;
837 	info->usrmsg_callback = hammer2_usrmsg_handler;
838 	info->label = strdup("hammer2");
839 	pthread_create(&thread, NULL, dmsg_master_service, info);
840 }
841 
842 /*
843  * Reconnect a physical disk service to the mesh.
844  */
845 static
846 void
847 disk_reconnect(const char *disk)
848 {
849 	struct disk_ioc_recluster recls;
850 	struct diskcon *dc;
851 	dmsg_master_service_info_t *info;
852 	pthread_t thread;
853 	int fd;
854 	int pipefds[2];
855 	char *path;
856 
857 	/*
858 	 * Urm, this will auto-create mdX+1, just ignore for now.
859 	 * This mechanic needs to be fixed.  It might actually be nice
860 	 * to be able to export md disks.
861 	 */
862 	if (strncmp(disk, "md", 2) == 0)
863 		return;
864 	if (strncmp(disk, "xa", 2) == 0)
865 		return;
866 
867 	/*
868 	 * Check if already connected
869 	 */
870 	pthread_mutex_lock(&diskmtx);
871 	TAILQ_FOREACH(dc, &diskconq, entry) {
872 		if (strcmp(dc->disk, disk) == 0)
873 			break;
874 	}
875 	pthread_mutex_unlock(&diskmtx);
876 	if (dc)
877 		return;
878 
879 	/*
880 	 * Not already connected, create a connection to the kernel
881 	 * disk driver.
882 	 */
883 	asprintf(&path, "/dev/%s", disk);
884 	fd = open(path, O_RDONLY);
885 	if (fd < 0) {
886 		fprintf(stderr, "reconnect %s: no access to disk\n", disk);
887 		free(path);
888 		return;
889 	}
890 	free(path);
891 	if (pipe(pipefds) < 0) {
892 		fprintf(stderr, "reconnect %s: pipe() failed\n", disk);
893 		close(fd);
894 		return;
895 	}
896 	bzero(&recls, sizeof(recls));
897 	recls.fd = pipefds[0];
898 	if (ioctl(fd, DIOCRECLUSTER, &recls) < 0) {
899 		fprintf(stderr, "reconnect %s: ioctl failed\n", disk);
900 		close(pipefds[0]);
901 		close(pipefds[1]);
902 		close(fd);
903 		return;
904 	}
905 	close(pipefds[0]);
906 	close(fd);
907 
908 	dc = malloc(sizeof(*dc));
909 	dc->disk = strdup(disk);
910 	pthread_mutex_lock(&diskmtx);
911 	TAILQ_INSERT_TAIL(&diskconq, dc, entry);
912 	pthread_mutex_unlock(&diskmtx);
913 
914 	info = malloc(sizeof(*info));
915 	bzero(info, sizeof(*info));
916 	info->fd = pipefds[1];
917 	info->detachme = 1;
918 	info->usrmsg_callback = hammer2_usrmsg_handler;
919 	info->exit_callback = disk_disconnect;
920 	info->handle = dc;
921 	info->label = strdup(dc->disk);
922 	pthread_create(&thread, NULL, dmsg_master_service, info);
923 }
924 
925 static
926 void
927 disk_disconnect(void *handle)
928 {
929 	struct diskcon *dc = handle;
930 
931 	fprintf(stderr, "DISK_DISCONNECT %s\n", dc->disk);
932 
933 	pthread_mutex_lock(&diskmtx);
934 	TAILQ_REMOVE(&diskconq, dc, entry);
935 	pthread_mutex_unlock(&diskmtx);
936 	free(dc->disk);
937 	free(dc);
938 }
939 
940 /*
941  * Connect our cluster controller to /dev/xdisk.  xdisk will pick up
942  * SPAN messages that we route to it, makes remote block devices
943  * available to the host, and can issue dmsg transactions based on
944  * device requests.
945  */
946 static
947 void
948 xdisk_connect(void)
949 {
950 	dmsg_master_service_info_t *info;
951 	struct xdisk_attach_ioctl xaioc;
952 	pthread_t thread;
953 	int pipefds[2];
954 	int xfd;
955 	int error;
956 
957 	/*
958 	 * Is /dev/xdisk available?
959 	 */
960 	xfd = open("/dev/xdisk", O_RDWR, 0600);
961 	if (xfd < 0) {
962 		fprintf(stderr, "xdisk_connect: Unable to open /dev/xdisk\n");
963 		return;
964 	}
965 
966 	if (pipe(pipefds) < 0) {
967 		fprintf(stderr, "xdisk_connect: pipe() failed\n");
968 		return;
969 	}
970 
971 	/*
972 	 * Pipe between cluster controller (this user process).
973 	 */
974 	info = malloc(sizeof(*info));
975 	bzero(info, sizeof(*info));
976 	info->fd = pipefds[1];
977 	info->detachme = 1;
978 	info->usrmsg_callback = hammer2_usrmsg_handler;
979 	info->exit_callback = NULL;
980 	pthread_create(&thread, NULL, dmsg_master_service, info);
981 
982 	/*
983 	 * And the xdisk device.
984 	 */
985 	bzero(&xaioc, sizeof(xaioc));
986 	xaioc.fd = pipefds[0];
987 	error = ioctl(xfd, XDISKIOCATTACH, &xaioc);
988 	close(pipefds[0]);
989 	close(xfd);
990 
991 	if (error < 0) {
992 		fprintf(stderr,
993 			"xdisk_connect: cannot attach %s\n",
994 			strerror(errno));
995 		return;
996 	}
997 }
998 
999 /*
1000  * Execute the specified function as a detached independent process/daemon,
1001  * unless we are in debug mode.  If we are in debug mode the function is
1002  * executed as a pthread in the current process.
1003  */
1004 void
1005 hammer2_demon(void *(*func)(void *), void *arg)
1006 {
1007 	pthread_t thread;
1008 	pid_t pid;
1009 	int ttyfd;
1010 
1011 	/*
1012 	 * Do not disconnect in debug mode
1013 	 */
1014 	if (DebugOpt) {
1015                 pthread_create(&thread, NULL, func, arg);
1016 		NormalExit = 0;
1017 		return;
1018 	}
1019 
1020 	/*
1021 	 * Otherwise disconnect us.  Double-fork to get rid of the ppid
1022 	 * association and disconnect the TTY.
1023 	 */
1024 	if ((pid = fork()) < 0) {
1025 		fprintf(stderr, "hammer2: fork(): %s\n", strerror(errno));
1026 		exit(1);
1027 	}
1028 	if (pid > 0) {
1029 		while (waitpid(pid, NULL, 0) != pid)
1030 			;
1031 		return;		/* parent returns */
1032 	}
1033 
1034 	/*
1035 	 * Get rid of the TTY/session before double-forking to finish off
1036 	 * the ppid.
1037 	 */
1038 	ttyfd = open("/dev/null", O_RDWR);
1039 	if (ttyfd >= 0) {
1040 		if (ttyfd != 0)
1041 			dup2(ttyfd, 0);
1042 		if (ttyfd != 1)
1043 			dup2(ttyfd, 1);
1044 		if (ttyfd != 2)
1045 			dup2(ttyfd, 2);
1046 		if (ttyfd > 2)
1047 			close(ttyfd);
1048 	}
1049 
1050 	ttyfd = open("/dev/tty", O_RDWR);
1051 	if (ttyfd >= 0) {
1052 		ioctl(ttyfd, TIOCNOTTY, 0);
1053 		close(ttyfd);
1054 	}
1055 	setsid();
1056 
1057 	/*
1058 	 * Second fork to disconnect ppid (the original parent waits for
1059 	 * us to exit).
1060 	 */
1061 	if ((pid = fork()) < 0) {
1062 		_exit(2);
1063 	}
1064 	if (pid > 0)
1065 		_exit(0);
1066 
1067 	/*
1068 	 * The double child
1069 	 */
1070 	setsid();
1071 	pthread_create(&thread, NULL, func, arg);
1072 	pthread_exit(NULL);
1073 	_exit(2);	/* NOT REACHED */
1074 }
1075