xref: /dragonfly/sbin/hammer2/cmd_service.c (revision 33311965)
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35 
36 #include "hammer2.h"
37 
38 #include <sys/xdiskioctl.h>
39 #include <machine/atomic.h>
40 
41 struct diskcon {
42 	TAILQ_ENTRY(diskcon) entry;
43 	char	*disk;
44 };
45 
46 struct service_node_opaque {
47 	char	cl_label[64];
48 	char	fs_label[64];
49 	dmsg_media_block_t block;
50 	int	attached;
51 	int	servicing;
52 	int	servicefd;
53 };
54 
55 struct autoconn {
56 	TAILQ_ENTRY(autoconn) entry;
57 	char	*host;
58 	int	stage;
59 	int	stopme;
60 	int	pipefd[2];	/* {read,write} */
61 	enum { AUTOCONN_INACTIVE, AUTOCONN_ACTIVE } state;
62 	pthread_t thread;
63 };
64 
65 #define WS " \r\n"
66 
67 TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq);
68 pthread_mutex_t diskmtx;
69 
70 static void *service_thread(void *data);
71 static void *udev_thread(void *data);
72 static void *autoconn_thread(void *data);
73 static void master_reconnect(const char *mntpt);
74 static void disk_reconnect(const char *disk);
75 static void disk_disconnect(void *handle);
76 static void udev_check_disks(void);
77 static void service_node_handler(void **opaque, struct dmsg_msg *msg, int op);
78 
79 static void xdisk_reconnect(struct service_node_opaque *info);
80 static void xdisk_disconnect(void *handle);
81 static void *xdisk_attach_tmpthread(void *data);
82 
83 /*
84  * Start-up the master listener daemon for the machine.  This daemon runs
85  * a UDP discovery protocol, a TCP rendezvous, and scans certain files
86  * and directories for work.
87  *
88  * --
89  *
90  * The only purpose for the UDP discovery protocol is to determine what
91  * other IPs on the LAN are running the hammer2 service daemon.  DNS is not
92  * required to operate, but hostnames (if assigned) must be unique.  If
93  * no hostname is assigned the host's IP is used as the name.  This name
94  * is broadcast along with the mtime of the originator's private key.
95  *
96  * Receiving hammer2 service daemons which are able to match the label against
97  * /etc/hammer2/remote/<label>.pub will initiate a persistent connection
98  * to the target.  Removal of the file will cause a disconnection.  A failed
99  * public key negotiation stops further connection attempts until either the
100  * file is updated or the remote mtime is updated.
101  *
102  * Generally speaking this results in a web of connections, typically a
103  * combination of point-to-point for the more important links and relayed
104  * (spanning tree) for less important or filtered links.
105  *
106  * --
107  *
108  * The TCP listener serves as a rendezvous point in the cluster, accepting
109  * connections, performing registrations and authentications, maintaining
110  * the spanning tree, and keeping track of message state so disconnects can
111  * be handled properly.
112  *
113  * Once authenticated only low-level messaging protocols (which includes
114  * tracking persistent messages) are handled by this daemon.  This daemon
115  * does not run the higher level quorum or locking protocols.
116  *
117  * --
118  *
119  * The file /etc/hammer2/autoconn, if it exists, contains a list of targets
120  * to connect to (which do not have to be on the local lan).  This list will
121  * be retried until a connection can be established.  The file is not usually
122  * needed for linkages local to the LAN.
123  */
124 int
125 cmd_service(void)
126 {
127 	struct sockaddr_in lsin;
128 	int on;
129 	int lfd;
130 
131 	/*
132 	 * Acquire socket and set options
133 	 */
134 	if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
135 		fprintf(stderr, "master_listen: socket(): %s\n",
136 			strerror(errno));
137 		return 1;
138 	}
139 	on = 1;
140 	setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
141 
142 	/*
143 	 * Setup listen port and try to bind.  If the bind fails we assume
144 	 * that a master listener process is already running and silently
145 	 * fail.
146 	 */
147 	bzero(&lsin, sizeof(lsin));
148 	lsin.sin_family = AF_INET;
149 	lsin.sin_addr.s_addr = INADDR_ANY;
150 	lsin.sin_port = htons(DMSG_LISTEN_PORT);
151 	if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) {
152 		close(lfd);
153 		if (QuietOpt == 0) {
154 			fprintf(stderr,
155 				"master listen: daemon already running\n");
156 		}
157 		return 0;
158 	}
159 	if (QuietOpt == 0)
160 		fprintf(stderr, "master listen: startup\n");
161 	listen(lfd, 50);
162 
163 	/*
164 	 * Fork and disconnect the controlling terminal and parent process,
165 	 * executing the specified function as a pthread.
166 	 *
167 	 * Returns to the original process which can then continue running.
168 	 * In debug mode this call will create the pthread without forking
169 	 * and set NormalExit to 0, instead of fork.
170 	 */
171 	hammer2_demon(service_thread, (void *)(intptr_t)lfd);
172 	if (NormalExit)
173 		close(lfd);
174 	return 0;
175 }
176 
177 /*
178  * Master listen/accept thread.  Accept connections on the master socket,
179  * starting a pthread for each one.
180  */
181 static
182 void *
183 service_thread(void *data)
184 {
185 	struct sockaddr_in asin;
186 	socklen_t alen;
187 	pthread_t thread;
188 	dmsg_master_service_info_t *info;
189 	int lfd = (int)(intptr_t)data;
190 	int fd;
191 	int i;
192 	int count;
193 	struct statfs *mntbuf = NULL;
194 	struct statvfs *mntvbuf = NULL;
195 
196 	/*
197 	 * Nobody waits for us
198 	 */
199 	setproctitle("hammer2 master listen");
200 	pthread_detach(pthread_self());
201 
202 	dmsg_node_handler = service_node_handler;
203 
204 	/*
205 	 * Start up a thread to handle block device monitoring
206 	 */
207 	thread = NULL;
208 	pthread_create(&thread, NULL, udev_thread, NULL);
209 
210 	/*
211 	 * Start thread to manage /etc/hammer2/autoconn
212 	 */
213 	thread = NULL;
214 	pthread_create(&thread, NULL, autoconn_thread, NULL);
215 
216 	/*
217 	 * Scan existing hammer2 mounts and reconnect to them using
218 	 * HAMMER2IOC_RECLUSTER.
219 	 */
220 	count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT);
221 	for (i = 0; i < count; ++i) {
222 		if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0)
223 			master_reconnect(mntbuf[i].f_mntonname);
224 	}
225 
226 	/*
227 	 * Accept connections and create pthreads to handle them after
228 	 * validating the IP.
229 	 */
230 	for (;;) {
231 		alen = sizeof(asin);
232 		fd = accept(lfd, (struct sockaddr *)&asin, &alen);
233 		if (fd < 0) {
234 			if (errno == EINTR)
235 				continue;
236 			break;
237 		}
238 		thread = NULL;
239 		fprintf(stderr, "service_thread: accept fd %d\n", fd);
240 		info = malloc(sizeof(*info));
241 		bzero(info, sizeof(*info));
242 		info->fd = fd;
243 		info->detachme = 1;
244 		info->dbgmsg_callback = hammer2_shell_parse;
245 		info->label = strdup("client");
246 		pthread_create(&thread, NULL, dmsg_master_service, info);
247 	}
248 	return (NULL);
249 }
250 
251 /*
252  * Node discovery code on received SPANs (or loss of SPANs).  This code
253  * is used to track the availability of remote block devices and install
254  * or deinstall them using the xdisk driver (/dev/xdisk).
255  *
256  * An installed xdisk creates /dev/xa%d and /dev/serno/<blah> based on
257  * the data handed to it.  When opened, a virtual circuit is forged and
258  * maintained to the block device server via DMSG.  Temporary failures
259  * stall the device until successfully reconnected or explicitly destroyed.
260  */
261 static
262 void
263 service_node_handler(void **opaquep, struct dmsg_msg *msg, int op)
264 {
265 	struct service_node_opaque *info = *opaquep;
266 
267 	switch(op) {
268 	case DMSG_NODEOP_ADD:
269 		if (msg->any.lnk_span.peer_type != DMSG_PEER_BLOCK)
270 			break;
271 		if (msg->any.lnk_span.pfs_type != DMSG_PFSTYPE_SERVER)
272 			break;
273 		if (info == NULL) {
274 			info = malloc(sizeof(*info));
275 			bzero(info, sizeof(*info));
276 			*opaquep = info;
277 		}
278 		snprintf(info->cl_label, sizeof(info->cl_label),
279 			 "%s", msg->any.lnk_span.cl_label);
280 		snprintf(info->fs_label, sizeof(info->fs_label),
281 			 "%s", msg->any.lnk_span.fs_label);
282 		info->block = msg->any.lnk_span.media.block;
283 		fprintf(stderr, "NODE ADD %s serno %s\n",
284 			info->cl_label, info->fs_label);
285 		info->attached = 1;
286 		xdisk_reconnect(info);
287 		break;
288 	case DMSG_NODEOP_DEL:
289 		if (info) {
290 			fprintf(stderr, "NODE DEL %s serno %s\n",
291 				info->cl_label, info->fs_label);
292 			pthread_mutex_lock(&diskmtx);
293 			*opaquep = NULL;
294 			info->attached = 0;
295 			if (info->servicing == 0)
296 				free(info);
297 			else
298 				shutdown(info->servicefd, SHUT_RDWR);/*XXX*/
299 			pthread_mutex_unlock(&diskmtx);
300 		}
301 		break;
302 	default:
303 		break;
304 	}
305 }
306 
307 /*
308  * Monitor block devices.  Currently polls every ~10 seconds or so.
309  */
310 static
311 void *
312 udev_thread(void *data __unused)
313 {
314 	int	fd;
315 	int	seq = 0;
316 
317 	pthread_detach(pthread_self());
318 
319 	if ((fd = open(UDEV_DEVICE_PATH, O_RDWR)) < 0) {
320 		fprintf(stderr, "udev_thread: unable to open \"%s\"\n",
321 			UDEV_DEVICE_PATH);
322 		pthread_exit(NULL);
323 	}
324 	udev_check_disks();
325 	while (ioctl(fd, UDEVWAIT, &seq) == 0) {
326 		udev_check_disks();
327 		sleep(1);
328 	}
329 	return (NULL);
330 }
331 
332 static void *autoconn_connect_thread(void *data);
333 static void autoconn_disconnect_signal(dmsg_iocom_t *iocom);
334 
335 static
336 void *
337 autoconn_thread(void *data __unused)
338 {
339 	TAILQ_HEAD(, autoconn) autolist;
340 	struct autoconn *ac;
341 	struct autoconn *next;
342 	pthread_t thread;
343 	struct stat st;
344 	time_t	t;
345 	time_t	lmod;
346 	int	found_last;
347 	FILE	*fp;
348 	char	buf[256];
349 
350 	TAILQ_INIT(&autolist);
351 	found_last = 0;
352 	lmod = 0;
353 
354 	pthread_detach(pthread_self());
355 	for (;;) {
356 		/*
357 		 * Polling interval
358 		 */
359 		sleep(5);
360 
361 		/*
362 		 * Poll the file.  Loop up if the synchronized state (lmod)
363 		 * has not changed.
364 		 */
365 		if (stat(HAMMER2_DEFAULT_DIR "/autoconn", &st) == 0) {
366 			if (lmod == st.st_mtime)
367 				continue;
368 			fp = fopen(HAMMER2_DEFAULT_DIR "/autoconn", "r");
369 			if (fp == NULL)
370 				continue;
371 		} else {
372 			if (lmod == 0)
373 				continue;
374 			fp = NULL;
375 		}
376 
377 		/*
378 		 * Wait at least 5 seconds after the file is created or
379 		 * removed.
380 		 *
381 		 * Do not update the synchronized state.
382 		 */
383 		if (fp == NULL && found_last) {
384 			found_last = 0;
385 			continue;
386 		} else if (fp && found_last == 0) {
387 			fclose(fp);
388 			found_last = 1;
389 			continue;
390 		}
391 
392 		/*
393 		 * Don't scan the file until the time progresses past the
394 		 * file's mtime, so we can validate that the file was not
395 		 * further modified during our scan.
396 		 *
397 		 * Do not update the synchronized state.
398 		 */
399 		time(&t);
400 		if (fp) {
401 			if (t == st.st_mtime) {
402 				fclose(fp);
403 				continue;
404 			}
405 			t = st.st_mtime;
406 		} else {
407 			t = 0;
408 		}
409 
410 		/*
411 		 * Set staging to disconnect, then scan the file.
412 		 */
413 		TAILQ_FOREACH(ac, &autolist, entry)
414 			ac->stage = 0;
415 		while (fp && fgets(buf, sizeof(buf), fp) != NULL) {
416 			char *host;
417 
418 			if ((host = strtok(buf, " \t\r\n")) == NULL ||
419 			    host[0] == '#') {
420 				continue;
421 			}
422 			TAILQ_FOREACH(ac, &autolist, entry) {
423 				if (strcmp(host, ac->host) == 0)
424 					break;
425 			}
426 			if (ac == NULL) {
427 				ac = malloc(sizeof(*ac));
428 				bzero(ac, sizeof(*ac));
429 				ac->host = strdup(host);
430 				ac->state = AUTOCONN_INACTIVE;
431 				TAILQ_INSERT_TAIL(&autolist, ac, entry);
432 			}
433 			ac->stage = 1;
434 		}
435 
436 		/*
437 		 * Ignore the scan (and retry again) if the file was
438 		 * modified during the scan.
439 		 *
440 		 * Do not update the synchronized state.
441 		 */
442 		if (fp) {
443 			if (fstat(fileno(fp), &st) < 0) {
444 				fclose(fp);
445 				continue;
446 			}
447 			fclose(fp);
448 			if (t != st.st_mtime)
449 				continue;
450 		}
451 
452 		/*
453 		 * Update the synchronized state and reconfigure the
454 		 * connect list as needed.
455 		 */
456 		lmod = t;
457 		next = TAILQ_FIRST(&autolist);
458 		while ((ac = next) != NULL) {
459 			next = TAILQ_NEXT(ac, entry);
460 
461 			/*
462 			 * Staging, initiate
463 			 */
464 			if (ac->stage && ac->state == AUTOCONN_INACTIVE) {
465 				if (pipe(ac->pipefd) == 0) {
466 					ac->stopme = 0;
467 					ac->state = AUTOCONN_ACTIVE;
468 					thread = NULL;
469 					pthread_create(&thread, NULL,
470 						       autoconn_connect_thread,
471 						       ac);
472 				}
473 			}
474 
475 			/*
476 			 * Unstaging, stop active connection.
477 			 *
478 			 * We write to the pipe which causes the iocom_core
479 			 * to call autoconn_disconnect_signal().
480 			 */
481 			if (ac->stage == 0 &&
482 			    ac->state == AUTOCONN_ACTIVE) {
483 				if (ac->stopme == 0) {
484 					char dummy = 0;
485 					ac->stopme = 1;
486 					write(ac->pipefd[1], &dummy, 1);
487 				}
488 			}
489 
490 			/*
491 			 * Unstaging, delete inactive connection.
492 			 */
493 			if (ac->stage == 0 &&
494 			    ac->state == AUTOCONN_INACTIVE) {
495 				TAILQ_REMOVE(&autolist, ac, entry);
496 				free(ac->host);
497 				free(ac);
498 				continue;
499 			}
500 		}
501 		sleep(5);
502 	}
503 	return(NULL);
504 }
505 
506 static
507 void *
508 autoconn_connect_thread(void *data)
509 {
510 	dmsg_master_service_info_t *info;
511 	struct autoconn *ac;
512 	void *res;
513 	int fd;
514 
515 	ac = data;
516 	pthread_detach(pthread_self());
517 
518 	while (ac->stopme == 0) {
519 		fd = dmsg_connect(ac->host);
520 		if (fd < 0) {
521 			fprintf(stderr, "autoconn: Connect failure: %s\n",
522 				ac->host);
523 			sleep(5);
524 			continue;
525 		}
526 		fprintf(stderr, "autoconn: Connect %s\n", ac->host);
527 
528 		info = malloc(sizeof(*info));
529 		bzero(info, sizeof(*info));
530 		info->fd = fd;
531 		info->altfd = ac->pipefd[0];
532 		info->altmsg_callback = autoconn_disconnect_signal;
533 		info->detachme = 0;
534 		info->noclosealt = 1;
535 		pthread_create(&ac->thread, NULL, dmsg_master_service, info);
536 		pthread_join(ac->thread, &res);
537 	}
538 	close(ac->pipefd[0]);
539 	ac->state = AUTOCONN_INACTIVE;
540 	/* auto structure can be ripped out here */
541 	return(NULL);
542 }
543 
544 static
545 void
546 autoconn_disconnect_signal(dmsg_iocom_t *iocom)
547 {
548 	fprintf(stderr, "autoconn: Shutting down socket\n");
549 	atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
550 }
551 
552 /*
553  * Retrieve the list of disk attachments and attempt to export
554  * them.
555  */
556 static
557 void
558 udev_check_disks(void)
559 {
560 	char tmpbuf[1024];
561 	char *buf = NULL;
562 	char *disk;
563 	int error;
564 	size_t n;
565 
566 	for (;;) {
567 		n = 0;
568 		error = sysctlbyname("kern.disks", NULL, &n, NULL, 0);
569 		if (error < 0 || n == 0)
570 			break;
571 		if (n >= sizeof(tmpbuf))
572 			buf = malloc(n + 1);
573 		else
574 			buf = tmpbuf;
575 		error = sysctlbyname("kern.disks", buf, &n, NULL, 0);
576 		if (error == 0) {
577 			buf[n] = 0;
578 			break;
579 		}
580 		if (buf != tmpbuf) {
581 			free(buf);
582 			buf = NULL;
583 		}
584 		if (errno != ENOMEM)
585 			break;
586 	}
587 	if (buf) {
588 		fprintf(stderr, "DISKS: %s\n", buf);
589 		for (disk = strtok(buf, WS); disk; disk = strtok(NULL, WS)) {
590 			disk_reconnect(disk);
591 		}
592 		if (buf != tmpbuf)
593 			free(buf);
594 	}
595 }
596 
597 /*
598  * Normally the mount program supplies a cluster communications
599  * descriptor to the hammer2 vfs on mount, but if you kill the service
600  * daemon and restart it that link will be lost.
601  *
602  * This procedure attempts to [re]connect to existing mounts when
603  * the service daemon is started up before going into its accept
604  * loop.
605  *
606  * NOTE: A hammer2 mount point can only accomodate one connection at a time
607  *	 so this will disconnect any existing connection during the
608  *	 reconnect.
609  */
610 static
611 void
612 master_reconnect(const char *mntpt)
613 {
614 	struct hammer2_ioc_recluster recls;
615 	dmsg_master_service_info_t *info;
616 	pthread_t thread;
617 	int fd;
618 	int pipefds[2];
619 
620 	fd = open(mntpt, O_RDONLY);
621 	if (fd < 0) {
622 		fprintf(stderr, "reconnect %s: no access to mount\n", mntpt);
623 		return;
624 	}
625 	if (pipe(pipefds) < 0) {
626 		fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt);
627 		close(fd);
628 		return;
629 	}
630 	bzero(&recls, sizeof(recls));
631 	recls.fd = pipefds[0];
632 	if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) {
633 		fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt);
634 		close(pipefds[0]);
635 		close(pipefds[1]);
636 		close(fd);
637 		return;
638 	}
639 	close(pipefds[0]);
640 	close(fd);
641 
642 	info = malloc(sizeof(*info));
643 	bzero(info, sizeof(*info));
644 	info->fd = pipefds[1];
645 	info->detachme = 1;
646 	info->dbgmsg_callback = hammer2_shell_parse;
647 	info->label = strdup("hammer2");
648 	pthread_create(&thread, NULL, dmsg_master_service, info);
649 }
650 
651 /*
652  * Reconnect a physical disk service to the mesh.
653  */
654 static
655 void
656 disk_reconnect(const char *disk)
657 {
658 	struct disk_ioc_recluster recls;
659 	struct diskcon *dc;
660 	dmsg_master_service_info_t *info;
661 	pthread_t thread;
662 	int fd;
663 	int pipefds[2];
664 	char *path;
665 
666 	/*
667 	 * Urm, this will auto-create mdX+1, just ignore for now.
668 	 * This mechanic needs to be fixed.  It might actually be nice
669 	 * to be able to export md disks.
670 	 */
671 	if (strncmp(disk, "md", 2) == 0)
672 		return;
673 	if (strncmp(disk, "xa", 2) == 0)
674 		return;
675 
676 	/*
677 	 * Check if already connected
678 	 */
679 	pthread_mutex_lock(&diskmtx);
680 	TAILQ_FOREACH(dc, &diskconq, entry) {
681 		if (strcmp(dc->disk, disk) == 0)
682 			break;
683 	}
684 	pthread_mutex_unlock(&diskmtx);
685 	if (dc)
686 		return;
687 
688 	/*
689 	 * Not already connected, create a connection to the kernel
690 	 * disk driver.
691 	 */
692 	asprintf(&path, "/dev/%s", disk);
693 	fd = open(path, O_RDONLY);
694 	if (fd < 0) {
695 		fprintf(stderr, "reconnect %s: no access to disk\n", disk);
696 		free(path);
697 		return;
698 	}
699 	free(path);
700 	if (pipe(pipefds) < 0) {
701 		fprintf(stderr, "reconnect %s: pipe() failed\n", disk);
702 		close(fd);
703 		return;
704 	}
705 	bzero(&recls, sizeof(recls));
706 	recls.fd = pipefds[0];
707 	if (ioctl(fd, DIOCRECLUSTER, &recls) < 0) {
708 		fprintf(stderr, "reconnect %s: ioctl failed\n", disk);
709 		close(pipefds[0]);
710 		close(pipefds[1]);
711 		close(fd);
712 		return;
713 	}
714 	close(pipefds[0]);
715 	close(fd);
716 
717 	dc = malloc(sizeof(*dc));
718 	dc->disk = strdup(disk);
719 	pthread_mutex_lock(&diskmtx);
720 	TAILQ_INSERT_TAIL(&diskconq, dc, entry);
721 	pthread_mutex_unlock(&diskmtx);
722 
723 	info = malloc(sizeof(*info));
724 	bzero(info, sizeof(*info));
725 	info->fd = pipefds[1];
726 	info->detachme = 1;
727 	info->dbgmsg_callback = hammer2_shell_parse;
728 	info->exit_callback = disk_disconnect;
729 	info->handle = dc;
730 	info->label = strdup(dc->disk);
731 	pthread_create(&thread, NULL, dmsg_master_service, info);
732 }
733 
734 static
735 void
736 disk_disconnect(void *handle)
737 {
738 	struct diskcon *dc = handle;
739 
740 	fprintf(stderr, "DISK_DISCONNECT %s\n", dc->disk);
741 
742 	pthread_mutex_lock(&diskmtx);
743 	TAILQ_REMOVE(&diskconq, dc, entry);
744 	pthread_mutex_unlock(&diskmtx);
745 	free(dc->disk);
746 	free(dc);
747 }
748 
749 /*
750  * [re]connect a remote disk service to the local system via /dev/xdisk.
751  */
752 static
753 void
754 xdisk_reconnect(struct service_node_opaque *xdisk)
755 {
756 	struct xdisk_attach_ioctl *xaioc;
757 	dmsg_master_service_info_t *info;
758 	pthread_t thread;
759 	int pipefds[2];
760 
761 	if (pipe(pipefds) < 0) {
762 		fprintf(stderr, "reconnect %s: pipe() failed\n",
763 			xdisk->cl_label);
764 		return;
765 	}
766 
767 	info = malloc(sizeof(*info));
768 	bzero(info, sizeof(*info));
769 	info->fd = pipefds[1];
770 	info->detachme = 1;
771 	info->dbgmsg_callback = hammer2_shell_parse;
772 	info->exit_callback = xdisk_disconnect;
773 	info->handle = xdisk;
774 	xdisk->servicing = 1;
775 	xdisk->servicefd = info->fd;
776 	info->label = strdup(xdisk->cl_label);
777 	pthread_create(&thread, NULL, dmsg_master_service, info);
778 
779 	/*
780 	 * We have to run the attach in its own pthread because it will
781 	 * synchronously interact with the messaging subsystem over the
782 	 * pipe.  If we do it here we will deadlock.
783 	 */
784 	xaioc = malloc(sizeof(*xaioc));
785 	bzero(xaioc, sizeof(*xaioc));
786 	snprintf(xaioc->cl_label, sizeof(xaioc->cl_label),
787 		 "%s", xdisk->cl_label);
788 	snprintf(xaioc->fs_label, sizeof(xaioc->fs_label),
789 		 "X-%s", xdisk->fs_label);
790 	xaioc->bytes = xdisk->block.bytes;
791 	xaioc->blksize = xdisk->block.blksize;
792 	xaioc->fd = pipefds[0];
793 
794 	pthread_create(&thread, NULL, xdisk_attach_tmpthread, xaioc);
795 }
796 
797 static
798 void *
799 xdisk_attach_tmpthread(void *data)
800 {
801 	struct xdisk_attach_ioctl *xaioc = data;
802 	int fd;
803 
804 	pthread_detach(pthread_self());
805 
806 	fd = open("/dev/xdisk", O_RDWR, 0600);
807 	if (fd < 0) {
808 		fprintf(stderr, "xdisk_reconnect: Unable to open /dev/xdisk\n");
809 	}
810 	if (ioctl(fd, XDISKIOCATTACH, xaioc) < 0) {
811 		fprintf(stderr, "reconnect %s: xdisk attach failed\n",
812 			xaioc->cl_label);
813 	}
814 	close(xaioc->fd);
815 	close(fd);
816 	return (NULL);
817 }
818 
819 static
820 void
821 xdisk_disconnect(void *handle)
822 {
823 	struct service_node_opaque *info = handle;
824 
825 	assert(info->servicing == 1);
826 
827 	pthread_mutex_lock(&diskmtx);
828 	info->servicing = 0;
829 	if (info->attached == 0)
830 		free(info);
831 	pthread_mutex_unlock(&diskmtx);
832 }
833