xref: /dragonfly/sbin/hammer2/cmd_service.c (revision 3c7e5806)
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35 
36 #include "hammer2.h"
37 
38 #include <sys/xdiskioctl.h>
39 #include <machine/atomic.h>
40 
41 struct hammer2_media_config {
42 	hammer2_volconf_t	copy_run;
43 	hammer2_volconf_t	copy_pend;
44 	pthread_t		thread;
45 	pthread_cond_t		cond;
46 	int			ctl;
47 	int			fd;
48 	int			pipefd[2];      /* signal stop */
49 	dmsg_iocom_t		iocom;
50 	pthread_t		iocom_thread;
51 	enum { H2MC_STOPPED, H2MC_CONNECT, H2MC_RUNNING } state;
52 };
53 
54 typedef struct hammer2_media_config hammer2_media_config_t;
55 
56 #define H2CONFCTL_STOP          0x00000001
57 #define H2CONFCTL_UPDATE        0x00000002
58 
59 struct diskcon {
60 	TAILQ_ENTRY(diskcon) entry;
61 	char	*disk;
62 };
63 
64 struct service_node_opaque {
65 	char	cl_label[64];
66 	char	fs_label[64];
67 	dmsg_media_block_t block;
68 	int	attached;
69 	int	servicing;
70 	int	servicefd;
71 };
72 
73 struct autoconn {
74 	TAILQ_ENTRY(autoconn) entry;
75 	char	*host;
76 	int	stage;
77 	int	stopme;
78 	int	pipefd[2];	/* {read,write} */
79 	enum { AUTOCONN_INACTIVE, AUTOCONN_ACTIVE } state;
80 	pthread_t thread;
81 };
82 
83 #define WS " \r\n"
84 
85 TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq);
86 static pthread_mutex_t diskmtx;
87 static pthread_mutex_t confmtx;
88 
89 static void *service_thread(void *data);
90 static void *udev_thread(void *data);
91 static void *autoconn_thread(void *data);
92 static void master_reconnect(const char *mntpt);
93 static void disk_reconnect(const char *disk);
94 static void disk_disconnect(void *handle);
95 static void udev_check_disks(void);
96 static void hammer2_usrmsg_handler(dmsg_msg_t *msg, int unmanaged);
97 static void *hammer2_volconf_thread(void *info);
98 static void hammer2_volconf_signal(dmsg_iocom_t *iocom);
99 static void hammer2_volconf_start(hammer2_media_config_t *conf,
100 			const char *hostname);
101 static void hammer2_volconf_stop(hammer2_media_config_t *conf);
102 
103 
104 static void xdisk_connect(void);
105 
106 /*
107  * Start-up the master listener daemon for the machine.  This daemon runs
108  * a UDP discovery protocol, a TCP rendezvous, and scans certain files
109  * and directories for work.
110  *
111  * --
112  *
113  * The only purpose for the UDP discovery protocol is to determine what
114  * other IPs on the LAN are running the hammer2 service daemon.  DNS is not
115  * required to operate, but hostnames (if assigned) must be unique.  If
116  * no hostname is assigned the host's IP is used as the name.  This name
117  * is broadcast along with the mtime of the originator's private key.
118  *
119  * Receiving hammer2 service daemons which are able to match the label against
120  * /etc/hammer2/remote/<label>.pub will initiate a persistent connection
121  * to the target.  Removal of the file will cause a disconnection.  A failed
122  * public key negotiation stops further connection attempts until either the
123  * file is updated or the remote mtime is updated.
124  *
125  * Generally speaking this results in a web of connections, typically a
126  * combination of point-to-point for the more important links and relayed
127  * (spanning tree) for less important or filtered links.
128  *
129  * --
130  *
131  * The TCP listener serves as a rendezvous point in the cluster, accepting
132  * connections, performing registrations and authentications, maintaining
133  * the spanning tree, and keeping track of message state so disconnects can
134  * be handled properly.
135  *
136  * Once authenticated only low-level messaging protocols (which includes
137  * tracking persistent messages) are handled by this daemon.  This daemon
138  * does not run the higher level quorum or locking protocols.
139  *
140  * --
141  *
142  * The file /etc/hammer2/autoconn, if it exists, contains a list of targets
143  * to connect to (which do not have to be on the local lan).  This list will
144  * be retried until a connection can be established.  The file is not usually
145  * needed for linkages local to the LAN.
146  */
147 int
148 cmd_service(void)
149 {
150 	struct sockaddr_in lsin;
151 	int on;
152 	int lfd;
153 
154 	/*
155 	 * Acquire socket and set options
156 	 */
157 	if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
158 		fprintf(stderr, "master_listen: socket(): %s\n",
159 			strerror(errno));
160 		return 1;
161 	}
162 	on = 1;
163 	setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
164 
165 	/*
166 	 * Setup listen port and try to bind.  If the bind fails we assume
167 	 * that a master listener process is already running and silently
168 	 * fail.
169 	 */
170 	bzero(&lsin, sizeof(lsin));
171 	lsin.sin_family = AF_INET;
172 	lsin.sin_addr.s_addr = INADDR_ANY;
173 	lsin.sin_port = htons(DMSG_LISTEN_PORT);
174 	if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) {
175 		close(lfd);
176 		if (QuietOpt == 0) {
177 			fprintf(stderr,
178 				"master listen: daemon already running\n");
179 		}
180 		return 0;
181 	}
182 	if (QuietOpt == 0)
183 		fprintf(stderr, "master listen: startup\n");
184 	listen(lfd, 50);
185 
186 	/*
187 	 * Fork and disconnect the controlling terminal and parent process,
188 	 * executing the specified function as a pthread.
189 	 *
190 	 * Returns to the original process which can then continue running.
191 	 * In debug mode this call will create the pthread without forking
192 	 * and set NormalExit to 0, instead of fork.
193 	 */
194 	hammer2_demon(service_thread, (void *)(intptr_t)lfd);
195 	if (NormalExit)
196 		close(lfd);
197 	return 0;
198 }
199 
200 /*
201  * Master listen/accept thread.  Accept connections on the master socket,
202  * starting a pthread for each one.
203  */
204 static
205 void *
206 service_thread(void *data)
207 {
208 	struct sockaddr_in asin;
209 	socklen_t alen;
210 	pthread_t thread;
211 	dmsg_master_service_info_t *info;
212 	int lfd = (int)(intptr_t)data;
213 	int fd;
214 	int i;
215 	int count;
216 	int opt;
217 	struct statfs *mntbuf = NULL;
218 	struct statvfs *mntvbuf = NULL;
219 
220 	/*
221 	 * Nobody waits for us
222 	 */
223 	setproctitle("hammer2 master listen");
224 	pthread_detach(pthread_self());
225 
226 	/*
227 	 * Start up a thread to handle block device monitoring for
228 	 * export to the cluster.
229 	 */
230 	thread = NULL;
231 	pthread_create(&thread, NULL, udev_thread, NULL);
232 
233 	/*
234 	 * Start up a thread to tie /dev/xdisk into the cluster
235 	 * controller.
236 	 */
237 	xdisk_connect();
238 
239 	/*
240 	 * Start thread to manage /etc/hammer2/autoconn
241 	 */
242 	thread = NULL;
243 	pthread_create(&thread, NULL, autoconn_thread, NULL);
244 
245 	/*
246 	 * Scan existing hammer2 mounts and reconnect to them using
247 	 * HAMMER2IOC_RECLUSTER.
248 	 */
249 	count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT);
250 	for (i = 0; i < count; ++i) {
251 		if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0)
252 			master_reconnect(mntbuf[i].f_mntonname);
253 	}
254 
255 	/*
256 	 * Accept connections and create pthreads to handle them after
257 	 * validating the IP.
258 	 */
259 	for (;;) {
260 		alen = sizeof(asin);
261 		fd = accept(lfd, (struct sockaddr *)&asin, &alen);
262 		if (fd < 0) {
263 			if (errno == EINTR)
264 				continue;
265 			break;
266 		}
267 		opt = 1;
268 		setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof opt);
269 		thread = NULL;
270 		fprintf(stderr, "service_thread: accept fd %d\n", fd);
271 		info = malloc(sizeof(*info));
272 		bzero(info, sizeof(*info));
273 		info->fd = fd;
274 		info->detachme = 1;
275 		info->usrmsg_callback = hammer2_usrmsg_handler;
276 		info->label = strdup("client");
277 		pthread_create(&thread, NULL, dmsg_master_service, info);
278 	}
279 	return (NULL);
280 }
281 
282 /*
283  * Handle/Monitor the dmsg stream.  If unmanaged is set we are responsible
284  * for responding for the message, otherwise if it is not set libdmsg has
285  * already done some preprocessing and will respond to the message for us
286  * when we return.
287  *
288  * We primarily monitor for VOLCONFs
289  */
290 static
291 void
292 hammer2_usrmsg_handler(dmsg_msg_t *msg, int unmanaged)
293 {
294 	dmsg_state_t *state;
295 	hammer2_media_config_t *conf;
296 	dmsg_lnk_hammer2_volconf_t *msgconf;
297 	int i;
298 
299 	/*
300 	 * Only process messages which are part of a LNK_CONN stream
301 	 */
302 	state = msg->state;
303 	if (state == NULL ||
304 	    (state->rxcmd & DMSGF_BASECMDMASK) != DMSG_LNK_CONN) {
305 		hammer2_shell_parse(msg, unmanaged);
306 		return;
307 	}
308 
309 	switch(msg->tcmd) {
310 	case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE:
311 	case DMSG_LNK_CONN | DMSGF_DELETE:
312 	case DMSG_LNK_ERROR | DMSGF_DELETE:
313 		/*
314 		 * Deleting connection, clean out all volume configs
315 		 */
316 		if (state->media == NULL || state->media->usrhandle == NULL)
317 			break;
318 		conf = state->media->usrhandle;
319 		fprintf(stderr, "Shutting down media spans\n");
320 		for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) {
321 			if (conf[i].thread) {
322 				conf[i].ctl = H2CONFCTL_STOP;
323 				pthread_cond_signal(&conf[i].cond);
324 			}
325 		}
326 		for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) {
327 			if (conf[i].thread) {
328 				pthread_join(conf[i].thread, NULL);
329 				conf->thread = NULL;
330 				pthread_cond_destroy(&conf[i].cond);
331 			}
332 		}
333 		state->media->usrhandle = NULL;
334 		free(conf);
335 		break;
336 	case DMSG_LNK_HAMMER2_VOLCONF:
337 		/*
338 		 * One-way volume-configuration message is transmitted
339 		 * over the open LNK_CONN transaction.
340 		 */
341 		fprintf(stderr, "RECEIVED VOLCONF\n");
342 
343 		if ((conf = state->media->usrhandle) == NULL) {
344 			conf = malloc(sizeof(*conf) * HAMMER2_COPYID_COUNT);
345 			bzero(conf, sizeof(*conf) * HAMMER2_COPYID_COUNT);
346 			state->media->usrhandle = conf;
347 		}
348 		msgconf = H2_LNK_VOLCONF(msg);
349 
350 		if (msgconf->index < 0 ||
351 		    msgconf->index >= HAMMER2_COPYID_COUNT) {
352 			fprintf(stderr,
353 				"VOLCONF: ILLEGAL INDEX %d\n",
354 				msgconf->index);
355 			break;
356 		}
357 		if (msgconf->copy.path[sizeof(msgconf->copy.path) - 1] != 0 ||
358 		    msgconf->copy.path[0] == 0) {
359 			fprintf(stderr,
360 				"VOLCONF: ILLEGAL PATH %d\n",
361 				msgconf->index);
362 			break;
363 		}
364 		conf += msgconf->index;
365 		pthread_mutex_lock(&confmtx);
366 		conf->copy_pend = msgconf->copy;
367 		conf->ctl |= H2CONFCTL_UPDATE;
368 		pthread_mutex_unlock(&confmtx);
369 		if (conf->thread == NULL) {
370 			fprintf(stderr, "VOLCONF THREAD STARTED\n");
371 			pthread_cond_init(&conf->cond, NULL);
372 			pthread_create(&conf->thread, NULL,
373 				       hammer2_volconf_thread, (void *)conf);
374 		}
375 		pthread_cond_signal(&conf->cond);
376 		break;
377 	default:
378 		if (unmanaged)
379 			dmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
380 		break;
381 	}
382 }
383 
384 static void *
385 hammer2_volconf_thread(void *info)
386 {
387 	hammer2_media_config_t *conf = info;
388 
389 	setproctitle("hammer2 volconf");
390 
391 	pthread_mutex_lock(&confmtx);
392 	while ((conf->ctl & H2CONFCTL_STOP) == 0) {
393 		if (conf->ctl & H2CONFCTL_UPDATE) {
394 			fprintf(stderr, "VOLCONF UPDATE\n");
395 			conf->ctl &= ~H2CONFCTL_UPDATE;
396 			if (bcmp(&conf->copy_run, &conf->copy_pend,
397 				 sizeof(conf->copy_run)) == 0) {
398 				fprintf(stderr, "VOLCONF: no changes\n");
399 				continue;
400 			}
401 			/*
402 			 * XXX TODO - auto reconnect on lookup failure or
403 			 *		connect failure or stream failure.
404 			 */
405 
406 			pthread_mutex_unlock(&confmtx);
407 			hammer2_volconf_stop(conf);
408 			conf->copy_run = conf->copy_pend;
409 			if (conf->copy_run.copyid != 0 &&
410 			    strncmp(conf->copy_run.path, "span:", 5) == 0) {
411 				hammer2_volconf_start(conf,
412 						      conf->copy_run.path + 5);
413 			}
414 			pthread_mutex_lock(&confmtx);
415 			fprintf(stderr, "VOLCONF UPDATE DONE state %d\n", conf->state);
416 		}
417 		if (conf->state == H2MC_CONNECT) {
418 			hammer2_volconf_start(conf, conf->copy_run.path + 5);
419 			pthread_mutex_unlock(&confmtx);
420 			sleep(5);
421 			pthread_mutex_lock(&confmtx);
422 		} else {
423 			pthread_cond_wait(&conf->cond, &confmtx);
424 		}
425 	}
426 	pthread_mutex_unlock(&confmtx);
427 	hammer2_volconf_stop(conf);
428 	return(NULL);
429 }
430 
431 static
432 void
433 hammer2_volconf_start(hammer2_media_config_t *conf, const char *hostname)
434 {
435 	dmsg_master_service_info_t *info;
436 
437 	switch(conf->state) {
438 	case H2MC_STOPPED:
439 	case H2MC_CONNECT:
440 		conf->fd = dmsg_connect(hostname);
441 		if (conf->fd < 0) {
442 			fprintf(stderr, "Unable to connect to %s\n", hostname);
443 			conf->state = H2MC_CONNECT;
444 		} else if (pipe(conf->pipefd) < 0) {
445 			close(conf->fd);
446 			fprintf(stderr, "pipe() failed during volconf\n");
447 			conf->state = H2MC_CONNECT;
448 		} else {
449 			fprintf(stderr, "VOLCONF CONNECT\n");
450 			info = malloc(sizeof(*info));
451 			bzero(info, sizeof(*info));
452 			info->fd = conf->fd;
453 			info->altfd = conf->pipefd[0];
454 			info->altmsg_callback = hammer2_volconf_signal;
455 			info->usrmsg_callback = hammer2_usrmsg_handler;
456 			info->detachme = 0;
457 			conf->state = H2MC_RUNNING;
458 			pthread_create(&conf->iocom_thread, NULL,
459 				       dmsg_master_service, info);
460 		}
461 		break;
462 	case H2MC_RUNNING:
463 		break;
464 	}
465 }
466 
467 static
468 void
469 hammer2_volconf_stop(hammer2_media_config_t *conf)
470 {
471 	switch(conf->state) {
472 	case H2MC_STOPPED:
473 		break;
474 	case H2MC_CONNECT:
475 		conf->state = H2MC_STOPPED;
476 		break;
477 	case H2MC_RUNNING:
478 		close(conf->pipefd[1]);
479 		conf->pipefd[1] = -1;
480 		pthread_join(conf->iocom_thread, NULL);
481 		conf->iocom_thread = NULL;
482 		conf->state = H2MC_STOPPED;
483 		break;
484 	}
485 }
486 
487 static
488 void
489 hammer2_volconf_signal(dmsg_iocom_t *iocom)
490 {
491 	atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
492 }
493 
494 /*
495  * Monitor block devices.  Currently polls every ~10 seconds or so.
496  */
497 static
498 void *
499 udev_thread(void *data __unused)
500 {
501 	int	fd;
502 	int	seq = 0;
503 
504 	pthread_detach(pthread_self());
505 	setproctitle("hammer2 udev_thread");
506 
507 	if ((fd = open(UDEV_DEVICE_PATH, O_RDWR)) < 0) {
508 		fprintf(stderr, "udev_thread: unable to open \"%s\"\n",
509 			UDEV_DEVICE_PATH);
510 		pthread_exit(NULL);
511 	}
512 	udev_check_disks();
513 	while (ioctl(fd, UDEVWAIT, &seq) == 0) {
514 		udev_check_disks();
515 		sleep(1);
516 	}
517 	return (NULL);
518 }
519 
520 static void *autoconn_connect_thread(void *data);
521 static void autoconn_disconnect_signal(dmsg_iocom_t *iocom);
522 
523 static
524 void *
525 autoconn_thread(void *data __unused)
526 {
527 	TAILQ_HEAD(, autoconn) autolist;
528 	struct autoconn *ac;
529 	struct autoconn *next;
530 	pthread_t thread;
531 	struct stat st;
532 	time_t	t;
533 	time_t	lmod;
534 	int	found_last;
535 	FILE	*fp;
536 	char	buf[256];
537 
538 	TAILQ_INIT(&autolist);
539 	found_last = 0;
540 	lmod = 0;
541 
542 	pthread_detach(pthread_self());
543 	setproctitle("hammer2 autoconn_thread");
544 	for (;;) {
545 		/*
546 		 * Polling interval
547 		 */
548 		sleep(5);
549 
550 		/*
551 		 * Poll the file.  Loop up if the synchronized state (lmod)
552 		 * has not changed.
553 		 */
554 		if (stat(HAMMER2_DEFAULT_DIR "/autoconn", &st) == 0) {
555 			if (lmod == st.st_mtime)
556 				continue;
557 			fp = fopen(HAMMER2_DEFAULT_DIR "/autoconn", "r");
558 			if (fp == NULL)
559 				continue;
560 		} else {
561 			if (lmod == 0)
562 				continue;
563 			fp = NULL;
564 		}
565 
566 		/*
567 		 * Wait at least 5 seconds after the file is created or
568 		 * removed.
569 		 *
570 		 * Do not update the synchronized state.
571 		 */
572 		if (fp == NULL && found_last) {
573 			found_last = 0;
574 			continue;
575 		} else if (fp && found_last == 0) {
576 			fclose(fp);
577 			found_last = 1;
578 			continue;
579 		}
580 
581 		/*
582 		 * Don't scan the file until the time progresses past the
583 		 * file's mtime, so we can validate that the file was not
584 		 * further modified during our scan.
585 		 *
586 		 * Do not update the synchronized state.
587 		 */
588 		time(&t);
589 		if (fp) {
590 			if (t == st.st_mtime) {
591 				fclose(fp);
592 				continue;
593 			}
594 			t = st.st_mtime;
595 		} else {
596 			t = 0;
597 		}
598 
599 		/*
600 		 * Set staging to disconnect, then scan the file.
601 		 */
602 		TAILQ_FOREACH(ac, &autolist, entry)
603 			ac->stage = 0;
604 		while (fp && fgets(buf, sizeof(buf), fp) != NULL) {
605 			char *host;
606 
607 			if ((host = strtok(buf, " \t\r\n")) == NULL ||
608 			    host[0] == '#') {
609 				continue;
610 			}
611 			TAILQ_FOREACH(ac, &autolist, entry) {
612 				if (strcmp(host, ac->host) == 0)
613 					break;
614 			}
615 			if (ac == NULL) {
616 				ac = malloc(sizeof(*ac));
617 				bzero(ac, sizeof(*ac));
618 				ac->host = strdup(host);
619 				ac->state = AUTOCONN_INACTIVE;
620 				TAILQ_INSERT_TAIL(&autolist, ac, entry);
621 			}
622 			ac->stage = 1;
623 		}
624 
625 		/*
626 		 * Ignore the scan (and retry again) if the file was
627 		 * modified during the scan.
628 		 *
629 		 * Do not update the synchronized state.
630 		 */
631 		if (fp) {
632 			if (fstat(fileno(fp), &st) < 0) {
633 				fclose(fp);
634 				continue;
635 			}
636 			fclose(fp);
637 			if (t != st.st_mtime)
638 				continue;
639 		}
640 
641 		/*
642 		 * Update the synchronized state and reconfigure the
643 		 * connect list as needed.
644 		 */
645 		lmod = t;
646 		next = TAILQ_FIRST(&autolist);
647 		while ((ac = next) != NULL) {
648 			next = TAILQ_NEXT(ac, entry);
649 
650 			/*
651 			 * Staging, initiate
652 			 */
653 			if (ac->stage && ac->state == AUTOCONN_INACTIVE) {
654 				if (pipe(ac->pipefd) == 0) {
655 					ac->stopme = 0;
656 					ac->state = AUTOCONN_ACTIVE;
657 					thread = NULL;
658 					pthread_create(&thread, NULL,
659 						       autoconn_connect_thread,
660 						       ac);
661 				}
662 			}
663 
664 			/*
665 			 * Unstaging, stop active connection.
666 			 *
667 			 * We write to the pipe which causes the iocom_core
668 			 * to call autoconn_disconnect_signal().
669 			 */
670 			if (ac->stage == 0 &&
671 			    ac->state == AUTOCONN_ACTIVE) {
672 				if (ac->stopme == 0) {
673 					char dummy = 0;
674 					ac->stopme = 1;
675 					write(ac->pipefd[1], &dummy, 1);
676 				}
677 			}
678 
679 			/*
680 			 * Unstaging, delete inactive connection.
681 			 */
682 			if (ac->stage == 0 &&
683 			    ac->state == AUTOCONN_INACTIVE) {
684 				TAILQ_REMOVE(&autolist, ac, entry);
685 				free(ac->host);
686 				free(ac);
687 				continue;
688 			}
689 		}
690 		sleep(5);
691 	}
692 	return(NULL);
693 }
694 
695 static
696 void *
697 autoconn_connect_thread(void *data)
698 {
699 	dmsg_master_service_info_t *info;
700 	struct autoconn *ac;
701 	void *res;
702 	int fd;
703 
704 	ac = data;
705 	pthread_detach(pthread_self());
706 	setproctitle("hammer2 dmsg");
707 
708 	while (ac->stopme == 0) {
709 		fd = dmsg_connect(ac->host);
710 		if (fd < 0) {
711 			if (DMsgDebugOpt > 2) {
712 				fprintf(stderr,
713 					"autoconn: Connect failure: %s\n",
714 					ac->host);
715 			}
716 			sleep(5);
717 			continue;
718 		}
719 		fprintf(stderr, "autoconn: Connect %s\n", ac->host);
720 
721 		info = malloc(sizeof(*info));
722 		bzero(info, sizeof(*info));
723 		info->fd = fd;
724 		info->altfd = ac->pipefd[0];
725 		info->altmsg_callback = autoconn_disconnect_signal;
726 		info->usrmsg_callback = hammer2_usrmsg_handler;
727 		info->detachme = 0;
728 		info->noclosealt = 1;
729 		pthread_create(&ac->thread, NULL, dmsg_master_service, info);
730 		pthread_join(ac->thread, &res);
731 	}
732 	close(ac->pipefd[0]);
733 	ac->state = AUTOCONN_INACTIVE;
734 	/* auto structure can be ripped out here */
735 	return(NULL);
736 }
737 
738 static
739 void
740 autoconn_disconnect_signal(dmsg_iocom_t *iocom)
741 {
742 	fprintf(stderr, "autoconn: Shutting down socket\n");
743 	atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
744 }
745 
746 /*
747  * Retrieve the list of disk attachments and attempt to export
748  * them.
749  */
750 static
751 void
752 udev_check_disks(void)
753 {
754 	char tmpbuf[1024];
755 	char *buf = NULL;
756 	char *disk;
757 	int error;
758 	size_t n;
759 
760 	for (;;) {
761 		n = 0;
762 		error = sysctlbyname("kern.disks", NULL, &n, NULL, 0);
763 		if (error < 0 || n == 0)
764 			break;
765 		if (n >= sizeof(tmpbuf))
766 			buf = malloc(n + 1);
767 		else
768 			buf = tmpbuf;
769 		error = sysctlbyname("kern.disks", buf, &n, NULL, 0);
770 		if (error == 0) {
771 			buf[n] = 0;
772 			break;
773 		}
774 		if (buf != tmpbuf) {
775 			free(buf);
776 			buf = NULL;
777 		}
778 		if (errno != ENOMEM)
779 			break;
780 	}
781 	if (buf) {
782 		fprintf(stderr, "DISKS: %s\n", buf);
783 		for (disk = strtok(buf, WS); disk; disk = strtok(NULL, WS)) {
784 			disk_reconnect(disk);
785 		}
786 		if (buf != tmpbuf)
787 			free(buf);
788 	}
789 }
790 
791 /*
792  * Normally the mount program supplies a cluster communications
793  * descriptor to the hammer2 vfs on mount, but if you kill the service
794  * daemon and restart it that link will be lost.
795  *
796  * This procedure attempts to [re]connect to existing mounts when
797  * the service daemon is started up before going into its accept
798  * loop.
799  *
800  * NOTE: A hammer2 mount point can only accomodate one connection at a time
801  *	 so this will disconnect any existing connection during the
802  *	 reconnect.
803  */
804 static
805 void
806 master_reconnect(const char *mntpt)
807 {
808 	struct hammer2_ioc_recluster recls;
809 	dmsg_master_service_info_t *info;
810 	pthread_t thread;
811 	int fd;
812 	int pipefds[2];
813 
814 	fd = open(mntpt, O_RDONLY);
815 	if (fd < 0) {
816 		fprintf(stderr, "reconnect %s: no access to mount\n", mntpt);
817 		return;
818 	}
819 	if (pipe(pipefds) < 0) {
820 		fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt);
821 		close(fd);
822 		return;
823 	}
824 	bzero(&recls, sizeof(recls));
825 	recls.fd = pipefds[0];
826 	if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) {
827 		fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt);
828 		close(pipefds[0]);
829 		close(pipefds[1]);
830 		close(fd);
831 		return;
832 	}
833 	close(pipefds[0]);
834 	close(fd);
835 
836 	info = malloc(sizeof(*info));
837 	bzero(info, sizeof(*info));
838 	info->fd = pipefds[1];
839 	info->detachme = 1;
840 	info->usrmsg_callback = hammer2_usrmsg_handler;
841 	info->label = strdup("hammer2");
842 	pthread_create(&thread, NULL, dmsg_master_service, info);
843 }
844 
845 /*
846  * Reconnect a physical disk service to the mesh.
847  */
848 static
849 void
850 disk_reconnect(const char *disk)
851 {
852 	struct disk_ioc_recluster recls;
853 	struct diskcon *dc;
854 	dmsg_master_service_info_t *info;
855 	pthread_t thread;
856 	int fd;
857 	int pipefds[2];
858 	char *path;
859 
860 	/*
861 	 * Urm, this will auto-create mdX+1, just ignore for now.
862 	 * This mechanic needs to be fixed.  It might actually be nice
863 	 * to be able to export md disks.
864 	 */
865 	if (strncmp(disk, "md", 2) == 0)
866 		return;
867 	if (strncmp(disk, "xa", 2) == 0)
868 		return;
869 
870 	/*
871 	 * Check if already connected
872 	 */
873 	pthread_mutex_lock(&diskmtx);
874 	TAILQ_FOREACH(dc, &diskconq, entry) {
875 		if (strcmp(dc->disk, disk) == 0)
876 			break;
877 	}
878 	pthread_mutex_unlock(&diskmtx);
879 	if (dc)
880 		return;
881 
882 	/*
883 	 * Not already connected, create a connection to the kernel
884 	 * disk driver.
885 	 */
886 	asprintf(&path, "/dev/%s", disk);
887 	fd = open(path, O_RDONLY);
888 	if (fd < 0) {
889 		fprintf(stderr, "reconnect %s: no access to disk\n", disk);
890 		free(path);
891 		return;
892 	}
893 	free(path);
894 	if (pipe(pipefds) < 0) {
895 		fprintf(stderr, "reconnect %s: pipe() failed\n", disk);
896 		close(fd);
897 		return;
898 	}
899 	bzero(&recls, sizeof(recls));
900 	recls.fd = pipefds[0];
901 	if (ioctl(fd, DIOCRECLUSTER, &recls) < 0) {
902 		fprintf(stderr, "reconnect %s: ioctl failed\n", disk);
903 		close(pipefds[0]);
904 		close(pipefds[1]);
905 		close(fd);
906 		return;
907 	}
908 	close(pipefds[0]);
909 	close(fd);
910 
911 	dc = malloc(sizeof(*dc));
912 	dc->disk = strdup(disk);
913 	pthread_mutex_lock(&diskmtx);
914 	TAILQ_INSERT_TAIL(&diskconq, dc, entry);
915 	pthread_mutex_unlock(&diskmtx);
916 
917 	info = malloc(sizeof(*info));
918 	bzero(info, sizeof(*info));
919 	info->fd = pipefds[1];
920 	info->detachme = 1;
921 	info->usrmsg_callback = hammer2_usrmsg_handler;
922 	info->exit_callback = disk_disconnect;
923 	info->handle = dc;
924 	info->label = strdup(dc->disk);
925 	pthread_create(&thread, NULL, dmsg_master_service, info);
926 }
927 
928 static
929 void
930 disk_disconnect(void *handle)
931 {
932 	struct diskcon *dc = handle;
933 
934 	fprintf(stderr, "DISK_DISCONNECT %s\n", dc->disk);
935 
936 	pthread_mutex_lock(&diskmtx);
937 	TAILQ_REMOVE(&diskconq, dc, entry);
938 	pthread_mutex_unlock(&diskmtx);
939 	free(dc->disk);
940 	free(dc);
941 }
942 
943 /*
944  * Connect our cluster controller to /dev/xdisk.  xdisk will pick up
945  * SPAN messages that we route to it, makes remote block devices
946  * available to the host, and can issue dmsg transactions based on
947  * device requests.
948  */
949 static
950 void
951 xdisk_connect(void)
952 {
953 	dmsg_master_service_info_t *info;
954 	struct xdisk_attach_ioctl xaioc;
955 	pthread_t thread;
956 	int pipefds[2];
957 	int xfd;
958 	int error;
959 
960 	/*
961 	 * Is /dev/xdisk available?
962 	 */
963 	xfd = open("/dev/xdisk", O_RDWR, 0600);
964 	if (xfd < 0) {
965 		fprintf(stderr, "xdisk_connect: Unable to open /dev/xdisk\n");
966 		return;
967 	}
968 
969 	if (pipe(pipefds) < 0) {
970 		fprintf(stderr, "xdisk_connect: pipe() failed\n");
971 		return;
972 	}
973 
974 	/*
975 	 * Pipe between cluster controller (this user process).
976 	 */
977 	info = malloc(sizeof(*info));
978 	bzero(info, sizeof(*info));
979 	info->fd = pipefds[1];
980 	info->detachme = 1;
981 	info->usrmsg_callback = hammer2_usrmsg_handler;
982 	info->exit_callback = NULL;
983 	pthread_create(&thread, NULL, dmsg_master_service, info);
984 
985 	/*
986 	 * And the xdisk device.
987 	 */
988 	bzero(&xaioc, sizeof(xaioc));
989 	xaioc.fd = pipefds[0];
990 	error = ioctl(xfd, XDISKIOCATTACH, &xaioc);
991 	close(pipefds[0]);
992 	close(xfd);
993 
994 	if (error < 0) {
995 		fprintf(stderr,
996 			"xdisk_connect: cannot attach %s\n",
997 			strerror(errno));
998 		return;
999 	}
1000 }
1001