xref: /dragonfly/sbin/hammer2/cmd_service.c (revision 3170ffd7)
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35 
36 #include "hammer2.h"
37 
38 #include <sys/xdiskioctl.h>
39 #include <machine/atomic.h>
40 
41 struct diskcon {
42 	TAILQ_ENTRY(diskcon) entry;
43 	char	*disk;
44 };
45 
46 struct service_node_opaque {
47 	char	cl_label[64];
48 	char	fs_label[64];
49 	dmsg_media_block_t block;
50 	int	attached;
51 	int	servicing;
52 	int	servicefd;
53 };
54 
55 struct autoconn {
56 	TAILQ_ENTRY(autoconn) entry;
57 	char	*host;
58 	int	stage;
59 	int	stopme;
60 	int	pipefd[2];	/* {read,write} */
61 	enum { AUTOCONN_INACTIVE, AUTOCONN_ACTIVE } state;
62 	pthread_t thread;
63 };
64 
65 #define WS " \r\n"
66 
67 TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq);
68 pthread_mutex_t diskmtx;
69 
70 static void *service_thread(void *data);
71 static void *udev_thread(void *data);
72 static void *autoconn_thread(void *data);
73 static void master_reconnect(const char *mntpt);
74 static void disk_reconnect(const char *disk);
75 static void disk_disconnect(void *handle);
76 static void udev_check_disks(void);
77 static void service_node_handler(void **opaque, struct dmsg_msg *msg, int op);
78 
79 static void xdisk_reconnect(struct service_node_opaque *info);
80 static void xdisk_disconnect(void *handle);
81 static void *xdisk_attach_tmpthread(void *data);
82 
83 /*
84  * Start-up the master listener daemon for the machine.
85  *
86  * The master listener serves as a rendezvous point in the cluster, accepting
87  * connections, performing registrations and authentications, maintaining
88  * the spanning tree, and keeping track of message state so disconnects can
89  * be handled properly.
90  *
91  * Once authenticated only low-level messaging protocols (which includes
92  * tracking persistent messages) are handled by this daemon.  This daemon
93  * does not run the higher level quorum or locking protocols.
94  *
95  * This daemon can also be told to maintain connections to other nodes,
96  * forming a messaging backbone, which in turn allows PFS's (if desired) to
97  * simply connect to the master daemon via localhost if desired.
98  * Backbones are specified via /etc/hammer2.conf.
99  */
100 int
101 cmd_service(void)
102 {
103 	struct sockaddr_in lsin;
104 	int on;
105 	int lfd;
106 
107 	/*
108 	 * Acquire socket and set options
109 	 */
110 	if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
111 		fprintf(stderr, "master_listen: socket(): %s\n",
112 			strerror(errno));
113 		return 1;
114 	}
115 	on = 1;
116 	setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
117 
118 	/*
119 	 * Setup listen port and try to bind.  If the bind fails we assume
120 	 * that a master listener process is already running and silently
121 	 * fail.
122 	 */
123 	bzero(&lsin, sizeof(lsin));
124 	lsin.sin_family = AF_INET;
125 	lsin.sin_addr.s_addr = INADDR_ANY;
126 	lsin.sin_port = htons(DMSG_LISTEN_PORT);
127 	if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) {
128 		close(lfd);
129 		if (QuietOpt == 0) {
130 			fprintf(stderr,
131 				"master listen: daemon already running\n");
132 		}
133 		return 0;
134 	}
135 	if (QuietOpt == 0)
136 		fprintf(stderr, "master listen: startup\n");
137 	listen(lfd, 50);
138 
139 	/*
140 	 * Fork and disconnect the controlling terminal and parent process,
141 	 * executing the specified function as a pthread.
142 	 *
143 	 * Returns to the original process which can then continue running.
144 	 * In debug mode this call will create the pthread without forking
145 	 * and set NormalExit to 0, instead of fork.
146 	 */
147 	hammer2_demon(service_thread, (void *)(intptr_t)lfd);
148 	if (NormalExit)
149 		close(lfd);
150 	return 0;
151 }
152 
153 /*
154  * Master listen/accept thread.  Accept connections on the master socket,
155  * starting a pthread for each one.
156  */
157 static
158 void *
159 service_thread(void *data)
160 {
161 	struct sockaddr_in asin;
162 	socklen_t alen;
163 	pthread_t thread;
164 	dmsg_master_service_info_t *info;
165 	int lfd = (int)(intptr_t)data;
166 	int fd;
167 	int i;
168 	int count;
169 	struct statfs *mntbuf = NULL;
170 	struct statvfs *mntvbuf = NULL;
171 
172 	/*
173 	 * Nobody waits for us
174 	 */
175 	setproctitle("hammer2 master listen");
176 	pthread_detach(pthread_self());
177 
178 	dmsg_node_handler = service_node_handler;
179 
180 	/*
181 	 * Start up a thread to handle block device monitoring
182 	 */
183 	thread = NULL;
184 	pthread_create(&thread, NULL, udev_thread, NULL);
185 
186 	/*
187 	 * Start thread to manage /etc/hammer2/autoconn
188 	 */
189 	thread = NULL;
190 	pthread_create(&thread, NULL, autoconn_thread, NULL);
191 
192 	/*
193 	 * Scan existing hammer2 mounts and reconnect to them using
194 	 * HAMMER2IOC_RECLUSTER.
195 	 */
196 	count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT);
197 	for (i = 0; i < count; ++i) {
198 		if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0)
199 			master_reconnect(mntbuf[i].f_mntonname);
200 	}
201 
202 	/*
203 	 * Accept connections and create pthreads to handle them after
204 	 * validating the IP.
205 	 */
206 	for (;;) {
207 		alen = sizeof(asin);
208 		fd = accept(lfd, (struct sockaddr *)&asin, &alen);
209 		if (fd < 0) {
210 			if (errno == EINTR)
211 				continue;
212 			break;
213 		}
214 		thread = NULL;
215 		fprintf(stderr, "service_thread: accept fd %d\n", fd);
216 		info = malloc(sizeof(*info));
217 		bzero(info, sizeof(*info));
218 		info->fd = fd;
219 		info->detachme = 1;
220 		info->dbgmsg_callback = hammer2_shell_parse;
221 		info->label = strdup("client");
222 		pthread_create(&thread, NULL, dmsg_master_service, info);
223 	}
224 	return (NULL);
225 }
226 
227 /*
228  * Node discovery code on received SPANs (or loss of SPANs).  This code
229  * is used to track the availability of remote block devices and install
230  * or deinstall them using the xdisk driver (/dev/xdisk).
231  *
232  * An installed xdisk creates /dev/xa%d and /dev/serno/<blah> based on
233  * the data handed to it.  When opened, a virtual circuit is forged and
234  * maintained to the block device server via DMSG.  Temporary failures
235  * stall the device until successfully reconnected or explicitly destroyed.
236  */
237 static
238 void
239 service_node_handler(void **opaquep, struct dmsg_msg *msg, int op)
240 {
241 	struct service_node_opaque *info = *opaquep;
242 
243 	switch(op) {
244 	case DMSG_NODEOP_ADD:
245 		if (msg->any.lnk_span.peer_type != DMSG_PEER_BLOCK)
246 			break;
247 		if (msg->any.lnk_span.pfs_type != DMSG_PFSTYPE_SERVER)
248 			break;
249 		if (info == NULL) {
250 			info = malloc(sizeof(*info));
251 			bzero(info, sizeof(*info));
252 			*opaquep = info;
253 		}
254 		snprintf(info->cl_label, sizeof(info->cl_label),
255 			 "%s", msg->any.lnk_span.cl_label);
256 		snprintf(info->fs_label, sizeof(info->fs_label),
257 			 "%s", msg->any.lnk_span.fs_label);
258 		info->block = msg->any.lnk_span.media.block;
259 		fprintf(stderr, "NODE ADD %s serno %s\n",
260 			info->cl_label, info->fs_label);
261 		info->attached = 1;
262 		xdisk_reconnect(info);
263 		break;
264 	case DMSG_NODEOP_DEL:
265 		if (info) {
266 			fprintf(stderr, "NODE DEL %s serno %s\n",
267 				info->cl_label, info->fs_label);
268 			pthread_mutex_lock(&diskmtx);
269 			*opaquep = NULL;
270 			info->attached = 0;
271 			if (info->servicing == 0)
272 				free(info);
273 			else
274 				shutdown(info->servicefd, SHUT_RDWR);/*XXX*/
275 			pthread_mutex_unlock(&diskmtx);
276 		}
277 		break;
278 	default:
279 		break;
280 	}
281 }
282 
283 /*
284  * Monitor block devices.  Currently polls every ~10 seconds or so.
285  */
286 static
287 void *
288 udev_thread(void *data __unused)
289 {
290 	int	fd;
291 	int	seq = 0;
292 
293 	pthread_detach(pthread_self());
294 
295 	if ((fd = open(UDEV_DEVICE_PATH, O_RDWR)) < 0) {
296 		fprintf(stderr, "udev_thread: unable to open \"%s\"\n",
297 			UDEV_DEVICE_PATH);
298 		pthread_exit(NULL);
299 	}
300 	udev_check_disks();
301 	while (ioctl(fd, UDEVWAIT, &seq) == 0) {
302 		udev_check_disks();
303 		sleep(1);
304 	}
305 	return (NULL);
306 }
307 
308 static void *autoconn_connect_thread(void *data);
309 static void autoconn_disconnect_signal(dmsg_iocom_t *iocom);
310 
311 static
312 void *
313 autoconn_thread(void *data __unused)
314 {
315 	TAILQ_HEAD(, autoconn) autolist;
316 	struct autoconn *ac;
317 	struct autoconn *next;
318 	pthread_t thread;
319 	struct stat st;
320 	time_t	t;
321 	time_t	lmod;
322 	int	found_last;
323 	FILE	*fp;
324 	char	buf[256];
325 
326 	TAILQ_INIT(&autolist);
327 	found_last = 0;
328 	lmod = 0;
329 
330 	pthread_detach(pthread_self());
331 	for (;;) {
332 		/*
333 		 * Polling interval
334 		 */
335 		sleep(5);
336 
337 		/*
338 		 * Poll the file.  Loop up if the synchronized state (lmod)
339 		 * has not changed.
340 		 */
341 		if (stat(HAMMER2_DEFAULT_DIR "/autoconn", &st) == 0) {
342 			if (lmod == st.st_mtime)
343 				continue;
344 			fp = fopen(HAMMER2_DEFAULT_DIR "/autoconn", "r");
345 			if (fp == NULL)
346 				continue;
347 		} else {
348 			if (lmod == 0)
349 				continue;
350 			fp = NULL;
351 		}
352 
353 		/*
354 		 * Wait at least 5 seconds after the file is created or
355 		 * removed.
356 		 *
357 		 * Do not update the synchronized state.
358 		 */
359 		if (fp == NULL && found_last) {
360 			found_last = 0;
361 			continue;
362 		} else if (fp && found_last == 0) {
363 			fclose(fp);
364 			found_last = 1;
365 			continue;
366 		}
367 
368 		/*
369 		 * Don't scan the file until the time progresses past the
370 		 * file's mtime, so we can validate that the file was not
371 		 * further modified during our scan.
372 		 *
373 		 * Do not update the synchronized state.
374 		 */
375 		time(&t);
376 		if (fp) {
377 			if (t == st.st_mtime) {
378 				fclose(fp);
379 				continue;
380 			}
381 			t = st.st_mtime;
382 		} else {
383 			t = 0;
384 		}
385 
386 		/*
387 		 * Set staging to disconnect, then scan the file.
388 		 */
389 		TAILQ_FOREACH(ac, &autolist, entry)
390 			ac->stage = 0;
391 		while (fp && fgets(buf, sizeof(buf), fp) != NULL) {
392 			char *host;
393 
394 			if ((host = strtok(buf, " \t\r\n")) == NULL ||
395 			    host[0] == '#') {
396 				continue;
397 			}
398 			TAILQ_FOREACH(ac, &autolist, entry) {
399 				if (strcmp(host, ac->host) == 0)
400 					break;
401 			}
402 			if (ac == NULL) {
403 				ac = malloc(sizeof(*ac));
404 				bzero(ac, sizeof(*ac));
405 				ac->host = strdup(host);
406 				ac->state = AUTOCONN_INACTIVE;
407 				TAILQ_INSERT_TAIL(&autolist, ac, entry);
408 			}
409 			ac->stage = 1;
410 		}
411 
412 		/*
413 		 * Ignore the scan (and retry again) if the file was
414 		 * modified during the scan.
415 		 *
416 		 * Do not update the synchronized state.
417 		 */
418 		if (fp) {
419 			if (fstat(fileno(fp), &st) < 0) {
420 				fclose(fp);
421 				continue;
422 			}
423 			fclose(fp);
424 			if (t != st.st_mtime)
425 				continue;
426 		}
427 
428 		/*
429 		 * Update the synchronized state and reconfigure the
430 		 * connect list as needed.
431 		 */
432 		lmod = t;
433 		next = TAILQ_FIRST(&autolist);
434 		while ((ac = next) != NULL) {
435 			next = TAILQ_NEXT(ac, entry);
436 
437 			/*
438 			 * Staging, initiate
439 			 */
440 			if (ac->stage && ac->state == AUTOCONN_INACTIVE) {
441 				if (pipe(ac->pipefd) == 0) {
442 					ac->stopme = 0;
443 					ac->state = AUTOCONN_ACTIVE;
444 					thread = NULL;
445 					pthread_create(&thread, NULL,
446 						       autoconn_connect_thread,
447 						       ac);
448 				}
449 			}
450 
451 			/*
452 			 * Unstaging, stop active connection.
453 			 *
454 			 * We write to the pipe which causes the iocom_core
455 			 * to call autoconn_disconnect_signal().
456 			 */
457 			if (ac->stage == 0 &&
458 			    ac->state == AUTOCONN_ACTIVE) {
459 				if (ac->stopme == 0) {
460 					char dummy = 0;
461 					ac->stopme = 1;
462 					write(ac->pipefd[1], &dummy, 1);
463 				}
464 			}
465 
466 			/*
467 			 * Unstaging, delete inactive connection.
468 			 */
469 			if (ac->stage == 0 &&
470 			    ac->state == AUTOCONN_INACTIVE) {
471 				TAILQ_REMOVE(&autolist, ac, entry);
472 				free(ac->host);
473 				free(ac);
474 				continue;
475 			}
476 		}
477 		sleep(5);
478 	}
479 	return(NULL);
480 }
481 
482 static
483 void *
484 autoconn_connect_thread(void *data)
485 {
486 	dmsg_master_service_info_t *info;
487 	struct autoconn *ac;
488 	void *res;
489 	int fd;
490 
491 	ac = data;
492 	pthread_detach(pthread_self());
493 
494 	while (ac->stopme == 0) {
495 		fd = dmsg_connect(ac->host);
496 		if (fd < 0) {
497 			fprintf(stderr, "autoconn: Connect failure: %s\n",
498 				ac->host);
499 			sleep(5);
500 			continue;
501 		}
502 		fprintf(stderr, "autoconn: Connect %s\n", ac->host);
503 
504 		info = malloc(sizeof(*info));
505 		bzero(info, sizeof(*info));
506 		info->fd = fd;
507 		info->altfd = ac->pipefd[0];
508 		info->altmsg_callback = autoconn_disconnect_signal;
509 		info->detachme = 0;
510 		info->noclosealt = 1;
511 		pthread_create(&ac->thread, NULL, dmsg_master_service, info);
512 		pthread_join(ac->thread, &res);
513 	}
514 	close(ac->pipefd[0]);
515 	ac->state = AUTOCONN_INACTIVE;
516 	/* auto structure can be ripped out here */
517 	return(NULL);
518 }
519 
520 static
521 void
522 autoconn_disconnect_signal(dmsg_iocom_t *iocom)
523 {
524 	fprintf(stderr, "autoconn: Shutting down socket\n");
525 	atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
526 }
527 
528 /*
529  * Retrieve the list of disk attachments and attempt to export
530  * them.
531  */
532 static
533 void
534 udev_check_disks(void)
535 {
536 	char tmpbuf[1024];
537 	char *buf = NULL;
538 	char *disk;
539 	int error;
540 	size_t n;
541 
542 	for (;;) {
543 		n = 0;
544 		error = sysctlbyname("kern.disks", NULL, &n, NULL, 0);
545 		if (error < 0 || n == 0)
546 			break;
547 		if (n >= sizeof(tmpbuf))
548 			buf = malloc(n + 1);
549 		else
550 			buf = tmpbuf;
551 		error = sysctlbyname("kern.disks", buf, &n, NULL, 0);
552 		if (error == 0) {
553 			buf[n] = 0;
554 			break;
555 		}
556 		if (buf != tmpbuf) {
557 			free(buf);
558 			buf = NULL;
559 		}
560 		if (errno != ENOMEM)
561 			break;
562 	}
563 	if (buf) {
564 		fprintf(stderr, "DISKS: %s\n", buf);
565 		for (disk = strtok(buf, WS); disk; disk = strtok(NULL, WS)) {
566 			disk_reconnect(disk);
567 		}
568 		if (buf != tmpbuf)
569 			free(buf);
570 	}
571 }
572 
573 /*
574  * Normally the mount program supplies a cluster communications
575  * descriptor to the hammer2 vfs on mount, but if you kill the service
576  * daemon and restart it that link will be lost.
577  *
578  * This procedure attempts to [re]connect to existing mounts when
579  * the service daemon is started up before going into its accept
580  * loop.
581  *
582  * NOTE: A hammer2 mount point can only accomodate one connection at a time
583  *	 so this will disconnect any existing connection during the
584  *	 reconnect.
585  */
586 static
587 void
588 master_reconnect(const char *mntpt)
589 {
590 	struct hammer2_ioc_recluster recls;
591 	dmsg_master_service_info_t *info;
592 	pthread_t thread;
593 	int fd;
594 	int pipefds[2];
595 
596 	fd = open(mntpt, O_RDONLY);
597 	if (fd < 0) {
598 		fprintf(stderr, "reconnect %s: no access to mount\n", mntpt);
599 		return;
600 	}
601 	if (pipe(pipefds) < 0) {
602 		fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt);
603 		close(fd);
604 		return;
605 	}
606 	bzero(&recls, sizeof(recls));
607 	recls.fd = pipefds[0];
608 	if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) {
609 		fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt);
610 		close(pipefds[0]);
611 		close(pipefds[1]);
612 		close(fd);
613 		return;
614 	}
615 	close(pipefds[0]);
616 	close(fd);
617 
618 	info = malloc(sizeof(*info));
619 	bzero(info, sizeof(*info));
620 	info->fd = pipefds[1];
621 	info->detachme = 1;
622 	info->dbgmsg_callback = hammer2_shell_parse;
623 	info->label = strdup("hammer2");
624 	pthread_create(&thread, NULL, dmsg_master_service, info);
625 }
626 
627 /*
628  * Reconnect a physical disk service to the mesh.
629  */
630 static
631 void
632 disk_reconnect(const char *disk)
633 {
634 	struct disk_ioc_recluster recls;
635 	struct diskcon *dc;
636 	dmsg_master_service_info_t *info;
637 	pthread_t thread;
638 	int fd;
639 	int pipefds[2];
640 	char *path;
641 
642 	/*
643 	 * Urm, this will auto-create mdX+1, just ignore for now.
644 	 * This mechanic needs to be fixed.  It might actually be nice
645 	 * to be able to export md disks.
646 	 */
647 	if (strncmp(disk, "md", 2) == 0)
648 		return;
649 	if (strncmp(disk, "xa", 2) == 0)
650 		return;
651 
652 	/*
653 	 * Check if already connected
654 	 */
655 	pthread_mutex_lock(&diskmtx);
656 	TAILQ_FOREACH(dc, &diskconq, entry) {
657 		if (strcmp(dc->disk, disk) == 0)
658 			break;
659 	}
660 	pthread_mutex_unlock(&diskmtx);
661 	if (dc)
662 		return;
663 
664 	/*
665 	 * Not already connected, create a connection to the kernel
666 	 * disk driver.
667 	 */
668 	asprintf(&path, "/dev/%s", disk);
669 	fd = open(path, O_RDONLY);
670 	if (fd < 0) {
671 		fprintf(stderr, "reconnect %s: no access to disk\n", disk);
672 		free(path);
673 		return;
674 	}
675 	free(path);
676 	if (pipe(pipefds) < 0) {
677 		fprintf(stderr, "reconnect %s: pipe() failed\n", disk);
678 		close(fd);
679 		return;
680 	}
681 	bzero(&recls, sizeof(recls));
682 	recls.fd = pipefds[0];
683 	if (ioctl(fd, DIOCRECLUSTER, &recls) < 0) {
684 		fprintf(stderr, "reconnect %s: ioctl failed\n", disk);
685 		close(pipefds[0]);
686 		close(pipefds[1]);
687 		close(fd);
688 		return;
689 	}
690 	close(pipefds[0]);
691 	close(fd);
692 
693 	dc = malloc(sizeof(*dc));
694 	dc->disk = strdup(disk);
695 	pthread_mutex_lock(&diskmtx);
696 	TAILQ_INSERT_TAIL(&diskconq, dc, entry);
697 	pthread_mutex_unlock(&diskmtx);
698 
699 	info = malloc(sizeof(*info));
700 	bzero(info, sizeof(*info));
701 	info->fd = pipefds[1];
702 	info->detachme = 1;
703 	info->dbgmsg_callback = hammer2_shell_parse;
704 	info->exit_callback = disk_disconnect;
705 	info->handle = dc;
706 	info->label = strdup(dc->disk);
707 	pthread_create(&thread, NULL, dmsg_master_service, info);
708 }
709 
710 static
711 void
712 disk_disconnect(void *handle)
713 {
714 	struct diskcon *dc = handle;
715 
716 	fprintf(stderr, "DISK_DISCONNECT %s\n", dc->disk);
717 
718 	pthread_mutex_lock(&diskmtx);
719 	TAILQ_REMOVE(&diskconq, dc, entry);
720 	pthread_mutex_unlock(&diskmtx);
721 	free(dc->disk);
722 	free(dc);
723 }
724 
725 /*
726  * [re]connect a remote disk service to the local system via /dev/xdisk.
727  */
728 static
729 void
730 xdisk_reconnect(struct service_node_opaque *xdisk)
731 {
732 	struct xdisk_attach_ioctl *xaioc;
733 	dmsg_master_service_info_t *info;
734 	pthread_t thread;
735 	int pipefds[2];
736 
737 	if (pipe(pipefds) < 0) {
738 		fprintf(stderr, "reconnect %s: pipe() failed\n",
739 			xdisk->cl_label);
740 		return;
741 	}
742 
743 	info = malloc(sizeof(*info));
744 	bzero(info, sizeof(*info));
745 	info->fd = pipefds[1];
746 	info->detachme = 1;
747 	info->dbgmsg_callback = hammer2_shell_parse;
748 	info->exit_callback = xdisk_disconnect;
749 	info->handle = xdisk;
750 	xdisk->servicing = 1;
751 	xdisk->servicefd = info->fd;
752 	info->label = strdup(xdisk->cl_label);
753 	pthread_create(&thread, NULL, dmsg_master_service, info);
754 
755 	/*
756 	 * We have to run the attach in its own pthread because it will
757 	 * synchronously interact with the messaging subsystem over the
758 	 * pipe.  If we do it here we will deadlock.
759 	 */
760 	xaioc = malloc(sizeof(*xaioc));
761 	bzero(xaioc, sizeof(xaioc));
762 	snprintf(xaioc->cl_label, sizeof(xaioc->cl_label),
763 		 "%s", xdisk->cl_label);
764 	snprintf(xaioc->fs_label, sizeof(xaioc->fs_label),
765 		 "X-%s", xdisk->fs_label);
766 	xaioc->bytes = xdisk->block.bytes;
767 	xaioc->blksize = xdisk->block.blksize;
768 	xaioc->fd = pipefds[0];
769 
770 	pthread_create(&thread, NULL, xdisk_attach_tmpthread, xaioc);
771 }
772 
773 static
774 void *
775 xdisk_attach_tmpthread(void *data)
776 {
777 	struct xdisk_attach_ioctl *xaioc = data;
778 	int fd;
779 
780 	pthread_detach(pthread_self());
781 
782 	fd = open("/dev/xdisk", O_RDWR, 0600);
783 	if (fd < 0) {
784 		fprintf(stderr, "xdisk_reconnect: Unable to open /dev/xdisk\n");
785 	}
786 	if (ioctl(fd, XDISKIOCATTACH, xaioc) < 0) {
787 		fprintf(stderr, "reconnect %s: xdisk attach failed\n",
788 			xaioc->cl_label);
789 	}
790 	close(xaioc->fd);
791 	close(fd);
792 	return (NULL);
793 }
794 
795 static
796 void
797 xdisk_disconnect(void *handle)
798 {
799 	struct service_node_opaque *info = handle;
800 
801 	assert(info->servicing == 1);
802 
803 	pthread_mutex_lock(&diskmtx);
804 	info->servicing = 0;
805 	if (info->attached == 0)
806 		free(info);
807 	pthread_mutex_unlock(&diskmtx);
808 }
809