1 /* 2 * Copyright (c) 2011-2012 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@dragonflybsd.org> 6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org> 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in 16 * the documentation and/or other materials provided with the 17 * distribution. 18 * 3. Neither the name of The DragonFly Project nor the names of its 19 * contributors may be used to endorse or promote products derived 20 * from this software without specific, prior written permission. 21 * 22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 33 * SUCH DAMAGE. 34 */ 35 36 #include "hammer2.h" 37 38 #include <sys/xdiskioctl.h> 39 #include <machine/atomic.h> 40 41 struct diskcon { 42 TAILQ_ENTRY(diskcon) entry; 43 char *disk; 44 }; 45 46 struct service_node_opaque { 47 char cl_label[64]; 48 char fs_label[64]; 49 dmsg_media_block_t block; 50 int attached; 51 int servicing; 52 int servicefd; 53 }; 54 55 struct autoconn { 56 TAILQ_ENTRY(autoconn) entry; 57 char *host; 58 int stage; 59 int stopme; 60 int pipefd[2]; /* {read,write} */ 61 enum { AUTOCONN_INACTIVE, AUTOCONN_ACTIVE } state; 62 pthread_t thread; 63 }; 64 65 #define WS " \r\n" 66 67 TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq); 68 pthread_mutex_t diskmtx; 69 70 static void *service_thread(void *data); 71 static void *udev_thread(void *data); 72 static void *autoconn_thread(void *data); 73 static void master_reconnect(const char *mntpt); 74 static void disk_reconnect(const char *disk); 75 static void disk_disconnect(void *handle); 76 static void udev_check_disks(void); 77 static void service_node_handler(void **opaque, struct dmsg_msg *msg, int op); 78 79 static void xdisk_reconnect(struct service_node_opaque *info); 80 static void xdisk_disconnect(void *handle); 81 static void *xdisk_attach_tmpthread(void *data); 82 83 /* 84 * Start-up the master listener daemon for the machine. This daemon runs 85 * a UDP discovery protocol, a TCP rendezvous, and scans certain files 86 * and directories for work. 87 * 88 * -- 89 * 90 * The only purpose for the UDP discovery protocol is to determine what 91 * other IPs on the LAN are running the hammer2 service daemon. DNS is not 92 * required to operate, but hostnames (if assigned) must be unique. If 93 * no hostname is assigned the host's IP is used as the name. This name 94 * is broadcast along with the mtime of the originator's private key. 95 * 96 * Receiving hammer2 service daemons which are able to match the label against 97 * /etc/hammer2/remote/<label>.pub will initiate a persistent connection 98 * to the target. Removal of the file will cause a disconnection. A failed 99 * public key negotiation stops further connection attempts until either the 100 * file is updated or the remote mtime is updated. 101 * 102 * Generally speaking this results in a web of connections, typically a 103 * combination of point-to-point for the more important links and relayed 104 * (spanning tree) for less important or filtered links. 105 * 106 * -- 107 * 108 * The TCP listener serves as a rendezvous point in the cluster, accepting 109 * connections, performing registrations and authentications, maintaining 110 * the spanning tree, and keeping track of message state so disconnects can 111 * be handled properly. 112 * 113 * Once authenticated only low-level messaging protocols (which includes 114 * tracking persistent messages) are handled by this daemon. This daemon 115 * does not run the higher level quorum or locking protocols. 116 * 117 * -- 118 * 119 * The file /etc/hammer2/autoconn, if it exists, contains a list of targets 120 * to connect to (which do not have to be on the local lan). This list will 121 * be retried until a connection can be established. The file is not usually 122 * needed for linkages local to the LAN. 123 */ 124 int 125 cmd_service(void) 126 { 127 struct sockaddr_in lsin; 128 int on; 129 int lfd; 130 131 /* 132 * Acquire socket and set options 133 */ 134 if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { 135 fprintf(stderr, "master_listen: socket(): %s\n", 136 strerror(errno)); 137 return 1; 138 } 139 on = 1; 140 setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); 141 142 /* 143 * Setup listen port and try to bind. If the bind fails we assume 144 * that a master listener process is already running and silently 145 * fail. 146 */ 147 bzero(&lsin, sizeof(lsin)); 148 lsin.sin_family = AF_INET; 149 lsin.sin_addr.s_addr = INADDR_ANY; 150 lsin.sin_port = htons(DMSG_LISTEN_PORT); 151 if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) { 152 close(lfd); 153 if (QuietOpt == 0) { 154 fprintf(stderr, 155 "master listen: daemon already running\n"); 156 } 157 return 0; 158 } 159 if (QuietOpt == 0) 160 fprintf(stderr, "master listen: startup\n"); 161 listen(lfd, 50); 162 163 /* 164 * Fork and disconnect the controlling terminal and parent process, 165 * executing the specified function as a pthread. 166 * 167 * Returns to the original process which can then continue running. 168 * In debug mode this call will create the pthread without forking 169 * and set NormalExit to 0, instead of fork. 170 */ 171 hammer2_demon(service_thread, (void *)(intptr_t)lfd); 172 if (NormalExit) 173 close(lfd); 174 return 0; 175 } 176 177 /* 178 * Master listen/accept thread. Accept connections on the master socket, 179 * starting a pthread for each one. 180 */ 181 static 182 void * 183 service_thread(void *data) 184 { 185 struct sockaddr_in asin; 186 socklen_t alen; 187 pthread_t thread; 188 dmsg_master_service_info_t *info; 189 int lfd = (int)(intptr_t)data; 190 int fd; 191 int i; 192 int count; 193 struct statfs *mntbuf = NULL; 194 struct statvfs *mntvbuf = NULL; 195 196 /* 197 * Nobody waits for us 198 */ 199 setproctitle("hammer2 master listen"); 200 pthread_detach(pthread_self()); 201 202 dmsg_node_handler = service_node_handler; 203 204 /* 205 * Start up a thread to handle block device monitoring 206 */ 207 thread = NULL; 208 pthread_create(&thread, NULL, udev_thread, NULL); 209 210 /* 211 * Start thread to manage /etc/hammer2/autoconn 212 */ 213 thread = NULL; 214 pthread_create(&thread, NULL, autoconn_thread, NULL); 215 216 /* 217 * Scan existing hammer2 mounts and reconnect to them using 218 * HAMMER2IOC_RECLUSTER. 219 */ 220 count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT); 221 for (i = 0; i < count; ++i) { 222 if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0) 223 master_reconnect(mntbuf[i].f_mntonname); 224 } 225 226 /* 227 * Accept connections and create pthreads to handle them after 228 * validating the IP. 229 */ 230 for (;;) { 231 alen = sizeof(asin); 232 fd = accept(lfd, (struct sockaddr *)&asin, &alen); 233 if (fd < 0) { 234 if (errno == EINTR) 235 continue; 236 break; 237 } 238 thread = NULL; 239 fprintf(stderr, "service_thread: accept fd %d\n", fd); 240 info = malloc(sizeof(*info)); 241 bzero(info, sizeof(*info)); 242 info->fd = fd; 243 info->detachme = 1; 244 info->dbgmsg_callback = hammer2_shell_parse; 245 info->label = strdup("client"); 246 pthread_create(&thread, NULL, dmsg_master_service, info); 247 } 248 return (NULL); 249 } 250 251 /* 252 * Node discovery code on received SPANs (or loss of SPANs). This code 253 * is used to track the availability of remote block devices and install 254 * or deinstall them using the xdisk driver (/dev/xdisk). 255 * 256 * An installed xdisk creates /dev/xa%d and /dev/serno/<blah> based on 257 * the data handed to it. When opened, a virtual circuit is forged and 258 * maintained to the block device server via DMSG. Temporary failures 259 * stall the device until successfully reconnected or explicitly destroyed. 260 */ 261 static 262 void 263 service_node_handler(void **opaquep, struct dmsg_msg *msg, int op) 264 { 265 struct service_node_opaque *info = *opaquep; 266 267 switch(op) { 268 case DMSG_NODEOP_ADD: 269 if (msg->any.lnk_span.peer_type != DMSG_PEER_BLOCK) 270 break; 271 if (msg->any.lnk_span.pfs_type != DMSG_PFSTYPE_SERVER) 272 break; 273 if (info == NULL) { 274 info = malloc(sizeof(*info)); 275 bzero(info, sizeof(*info)); 276 *opaquep = info; 277 } 278 snprintf(info->cl_label, sizeof(info->cl_label), 279 "%s", msg->any.lnk_span.cl_label); 280 snprintf(info->fs_label, sizeof(info->fs_label), 281 "%s", msg->any.lnk_span.fs_label); 282 info->block = msg->any.lnk_span.media.block; 283 fprintf(stderr, "NODE ADD %s serno %s\n", 284 info->cl_label, info->fs_label); 285 info->attached = 1; 286 xdisk_reconnect(info); 287 break; 288 case DMSG_NODEOP_DEL: 289 if (info) { 290 fprintf(stderr, "NODE DEL %s serno %s\n", 291 info->cl_label, info->fs_label); 292 pthread_mutex_lock(&diskmtx); 293 *opaquep = NULL; 294 info->attached = 0; 295 if (info->servicing == 0) 296 free(info); 297 else 298 shutdown(info->servicefd, SHUT_RDWR);/*XXX*/ 299 pthread_mutex_unlock(&diskmtx); 300 } 301 break; 302 default: 303 break; 304 } 305 } 306 307 /* 308 * Monitor block devices. Currently polls every ~10 seconds or so. 309 */ 310 static 311 void * 312 udev_thread(void *data __unused) 313 { 314 int fd; 315 int seq = 0; 316 317 pthread_detach(pthread_self()); 318 319 if ((fd = open(UDEV_DEVICE_PATH, O_RDWR)) < 0) { 320 fprintf(stderr, "udev_thread: unable to open \"%s\"\n", 321 UDEV_DEVICE_PATH); 322 pthread_exit(NULL); 323 } 324 udev_check_disks(); 325 while (ioctl(fd, UDEVWAIT, &seq) == 0) { 326 udev_check_disks(); 327 sleep(1); 328 } 329 return (NULL); 330 } 331 332 static void *autoconn_connect_thread(void *data); 333 static void autoconn_disconnect_signal(dmsg_iocom_t *iocom); 334 335 static 336 void * 337 autoconn_thread(void *data __unused) 338 { 339 TAILQ_HEAD(, autoconn) autolist; 340 struct autoconn *ac; 341 struct autoconn *next; 342 pthread_t thread; 343 struct stat st; 344 time_t t; 345 time_t lmod; 346 int found_last; 347 FILE *fp; 348 char buf[256]; 349 350 TAILQ_INIT(&autolist); 351 found_last = 0; 352 lmod = 0; 353 354 pthread_detach(pthread_self()); 355 for (;;) { 356 /* 357 * Polling interval 358 */ 359 sleep(5); 360 361 /* 362 * Poll the file. Loop up if the synchronized state (lmod) 363 * has not changed. 364 */ 365 if (stat(HAMMER2_DEFAULT_DIR "/autoconn", &st) == 0) { 366 if (lmod == st.st_mtime) 367 continue; 368 fp = fopen(HAMMER2_DEFAULT_DIR "/autoconn", "r"); 369 if (fp == NULL) 370 continue; 371 } else { 372 if (lmod == 0) 373 continue; 374 fp = NULL; 375 } 376 377 /* 378 * Wait at least 5 seconds after the file is created or 379 * removed. 380 * 381 * Do not update the synchronized state. 382 */ 383 if (fp == NULL && found_last) { 384 found_last = 0; 385 continue; 386 } else if (fp && found_last == 0) { 387 fclose(fp); 388 found_last = 1; 389 continue; 390 } 391 392 /* 393 * Don't scan the file until the time progresses past the 394 * file's mtime, so we can validate that the file was not 395 * further modified during our scan. 396 * 397 * Do not update the synchronized state. 398 */ 399 time(&t); 400 if (fp) { 401 if (t == st.st_mtime) { 402 fclose(fp); 403 continue; 404 } 405 t = st.st_mtime; 406 } else { 407 t = 0; 408 } 409 410 /* 411 * Set staging to disconnect, then scan the file. 412 */ 413 TAILQ_FOREACH(ac, &autolist, entry) 414 ac->stage = 0; 415 while (fp && fgets(buf, sizeof(buf), fp) != NULL) { 416 char *host; 417 418 if ((host = strtok(buf, " \t\r\n")) == NULL || 419 host[0] == '#') { 420 continue; 421 } 422 TAILQ_FOREACH(ac, &autolist, entry) { 423 if (strcmp(host, ac->host) == 0) 424 break; 425 } 426 if (ac == NULL) { 427 ac = malloc(sizeof(*ac)); 428 bzero(ac, sizeof(*ac)); 429 ac->host = strdup(host); 430 ac->state = AUTOCONN_INACTIVE; 431 TAILQ_INSERT_TAIL(&autolist, ac, entry); 432 } 433 ac->stage = 1; 434 } 435 436 /* 437 * Ignore the scan (and retry again) if the file was 438 * modified during the scan. 439 * 440 * Do not update the synchronized state. 441 */ 442 if (fp) { 443 if (fstat(fileno(fp), &st) < 0) { 444 fclose(fp); 445 continue; 446 } 447 fclose(fp); 448 if (t != st.st_mtime) 449 continue; 450 } 451 452 /* 453 * Update the synchronized state and reconfigure the 454 * connect list as needed. 455 */ 456 lmod = t; 457 next = TAILQ_FIRST(&autolist); 458 while ((ac = next) != NULL) { 459 next = TAILQ_NEXT(ac, entry); 460 461 /* 462 * Staging, initiate 463 */ 464 if (ac->stage && ac->state == AUTOCONN_INACTIVE) { 465 if (pipe(ac->pipefd) == 0) { 466 ac->stopme = 0; 467 ac->state = AUTOCONN_ACTIVE; 468 thread = NULL; 469 pthread_create(&thread, NULL, 470 autoconn_connect_thread, 471 ac); 472 } 473 } 474 475 /* 476 * Unstaging, stop active connection. 477 * 478 * We write to the pipe which causes the iocom_core 479 * to call autoconn_disconnect_signal(). 480 */ 481 if (ac->stage == 0 && 482 ac->state == AUTOCONN_ACTIVE) { 483 if (ac->stopme == 0) { 484 char dummy = 0; 485 ac->stopme = 1; 486 write(ac->pipefd[1], &dummy, 1); 487 } 488 } 489 490 /* 491 * Unstaging, delete inactive connection. 492 */ 493 if (ac->stage == 0 && 494 ac->state == AUTOCONN_INACTIVE) { 495 TAILQ_REMOVE(&autolist, ac, entry); 496 free(ac->host); 497 free(ac); 498 continue; 499 } 500 } 501 sleep(5); 502 } 503 return(NULL); 504 } 505 506 static 507 void * 508 autoconn_connect_thread(void *data) 509 { 510 dmsg_master_service_info_t *info; 511 struct autoconn *ac; 512 void *res; 513 int fd; 514 515 ac = data; 516 pthread_detach(pthread_self()); 517 518 while (ac->stopme == 0) { 519 fd = dmsg_connect(ac->host); 520 if (fd < 0) { 521 fprintf(stderr, "autoconn: Connect failure: %s\n", 522 ac->host); 523 sleep(5); 524 continue; 525 } 526 fprintf(stderr, "autoconn: Connect %s\n", ac->host); 527 528 info = malloc(sizeof(*info)); 529 bzero(info, sizeof(*info)); 530 info->fd = fd; 531 info->altfd = ac->pipefd[0]; 532 info->altmsg_callback = autoconn_disconnect_signal; 533 info->detachme = 0; 534 info->noclosealt = 1; 535 pthread_create(&ac->thread, NULL, dmsg_master_service, info); 536 pthread_join(ac->thread, &res); 537 } 538 close(ac->pipefd[0]); 539 ac->state = AUTOCONN_INACTIVE; 540 /* auto structure can be ripped out here */ 541 return(NULL); 542 } 543 544 static 545 void 546 autoconn_disconnect_signal(dmsg_iocom_t *iocom) 547 { 548 fprintf(stderr, "autoconn: Shutting down socket\n"); 549 atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF); 550 } 551 552 /* 553 * Retrieve the list of disk attachments and attempt to export 554 * them. 555 */ 556 static 557 void 558 udev_check_disks(void) 559 { 560 char tmpbuf[1024]; 561 char *buf = NULL; 562 char *disk; 563 int error; 564 size_t n; 565 566 for (;;) { 567 n = 0; 568 error = sysctlbyname("kern.disks", NULL, &n, NULL, 0); 569 if (error < 0 || n == 0) 570 break; 571 if (n >= sizeof(tmpbuf)) 572 buf = malloc(n + 1); 573 else 574 buf = tmpbuf; 575 error = sysctlbyname("kern.disks", buf, &n, NULL, 0); 576 if (error == 0) { 577 buf[n] = 0; 578 break; 579 } 580 if (buf != tmpbuf) { 581 free(buf); 582 buf = NULL; 583 } 584 if (errno != ENOMEM) 585 break; 586 } 587 if (buf) { 588 fprintf(stderr, "DISKS: %s\n", buf); 589 for (disk = strtok(buf, WS); disk; disk = strtok(NULL, WS)) { 590 disk_reconnect(disk); 591 } 592 if (buf != tmpbuf) 593 free(buf); 594 } 595 } 596 597 /* 598 * Normally the mount program supplies a cluster communications 599 * descriptor to the hammer2 vfs on mount, but if you kill the service 600 * daemon and restart it that link will be lost. 601 * 602 * This procedure attempts to [re]connect to existing mounts when 603 * the service daemon is started up before going into its accept 604 * loop. 605 * 606 * NOTE: A hammer2 mount point can only accomodate one connection at a time 607 * so this will disconnect any existing connection during the 608 * reconnect. 609 */ 610 static 611 void 612 master_reconnect(const char *mntpt) 613 { 614 struct hammer2_ioc_recluster recls; 615 dmsg_master_service_info_t *info; 616 pthread_t thread; 617 int fd; 618 int pipefds[2]; 619 620 fd = open(mntpt, O_RDONLY); 621 if (fd < 0) { 622 fprintf(stderr, "reconnect %s: no access to mount\n", mntpt); 623 return; 624 } 625 if (pipe(pipefds) < 0) { 626 fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt); 627 close(fd); 628 return; 629 } 630 bzero(&recls, sizeof(recls)); 631 recls.fd = pipefds[0]; 632 if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) { 633 fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt); 634 close(pipefds[0]); 635 close(pipefds[1]); 636 close(fd); 637 return; 638 } 639 close(pipefds[0]); 640 close(fd); 641 642 info = malloc(sizeof(*info)); 643 bzero(info, sizeof(*info)); 644 info->fd = pipefds[1]; 645 info->detachme = 1; 646 info->dbgmsg_callback = hammer2_shell_parse; 647 info->label = strdup("hammer2"); 648 pthread_create(&thread, NULL, dmsg_master_service, info); 649 } 650 651 /* 652 * Reconnect a physical disk service to the mesh. 653 */ 654 static 655 void 656 disk_reconnect(const char *disk) 657 { 658 struct disk_ioc_recluster recls; 659 struct diskcon *dc; 660 dmsg_master_service_info_t *info; 661 pthread_t thread; 662 int fd; 663 int pipefds[2]; 664 char *path; 665 666 /* 667 * Urm, this will auto-create mdX+1, just ignore for now. 668 * This mechanic needs to be fixed. It might actually be nice 669 * to be able to export md disks. 670 */ 671 if (strncmp(disk, "md", 2) == 0) 672 return; 673 if (strncmp(disk, "xa", 2) == 0) 674 return; 675 676 /* 677 * Check if already connected 678 */ 679 pthread_mutex_lock(&diskmtx); 680 TAILQ_FOREACH(dc, &diskconq, entry) { 681 if (strcmp(dc->disk, disk) == 0) 682 break; 683 } 684 pthread_mutex_unlock(&diskmtx); 685 if (dc) 686 return; 687 688 /* 689 * Not already connected, create a connection to the kernel 690 * disk driver. 691 */ 692 asprintf(&path, "/dev/%s", disk); 693 fd = open(path, O_RDONLY); 694 if (fd < 0) { 695 fprintf(stderr, "reconnect %s: no access to disk\n", disk); 696 free(path); 697 return; 698 } 699 free(path); 700 if (pipe(pipefds) < 0) { 701 fprintf(stderr, "reconnect %s: pipe() failed\n", disk); 702 close(fd); 703 return; 704 } 705 bzero(&recls, sizeof(recls)); 706 recls.fd = pipefds[0]; 707 if (ioctl(fd, DIOCRECLUSTER, &recls) < 0) { 708 fprintf(stderr, "reconnect %s: ioctl failed\n", disk); 709 close(pipefds[0]); 710 close(pipefds[1]); 711 close(fd); 712 return; 713 } 714 close(pipefds[0]); 715 close(fd); 716 717 dc = malloc(sizeof(*dc)); 718 dc->disk = strdup(disk); 719 pthread_mutex_lock(&diskmtx); 720 TAILQ_INSERT_TAIL(&diskconq, dc, entry); 721 pthread_mutex_unlock(&diskmtx); 722 723 info = malloc(sizeof(*info)); 724 bzero(info, sizeof(*info)); 725 info->fd = pipefds[1]; 726 info->detachme = 1; 727 info->dbgmsg_callback = hammer2_shell_parse; 728 info->exit_callback = disk_disconnect; 729 info->handle = dc; 730 info->label = strdup(dc->disk); 731 pthread_create(&thread, NULL, dmsg_master_service, info); 732 } 733 734 static 735 void 736 disk_disconnect(void *handle) 737 { 738 struct diskcon *dc = handle; 739 740 fprintf(stderr, "DISK_DISCONNECT %s\n", dc->disk); 741 742 pthread_mutex_lock(&diskmtx); 743 TAILQ_REMOVE(&diskconq, dc, entry); 744 pthread_mutex_unlock(&diskmtx); 745 free(dc->disk); 746 free(dc); 747 } 748 749 /* 750 * [re]connect a remote disk service to the local system via /dev/xdisk. 751 */ 752 static 753 void 754 xdisk_reconnect(struct service_node_opaque *xdisk) 755 { 756 struct xdisk_attach_ioctl *xaioc; 757 dmsg_master_service_info_t *info; 758 pthread_t thread; 759 int pipefds[2]; 760 761 if (pipe(pipefds) < 0) { 762 fprintf(stderr, "reconnect %s: pipe() failed\n", 763 xdisk->cl_label); 764 return; 765 } 766 767 info = malloc(sizeof(*info)); 768 bzero(info, sizeof(*info)); 769 info->fd = pipefds[1]; 770 info->detachme = 1; 771 info->dbgmsg_callback = hammer2_shell_parse; 772 info->exit_callback = xdisk_disconnect; 773 info->handle = xdisk; 774 xdisk->servicing = 1; 775 xdisk->servicefd = info->fd; 776 info->label = strdup(xdisk->cl_label); 777 pthread_create(&thread, NULL, dmsg_master_service, info); 778 779 /* 780 * We have to run the attach in its own pthread because it will 781 * synchronously interact with the messaging subsystem over the 782 * pipe. If we do it here we will deadlock. 783 */ 784 xaioc = malloc(sizeof(*xaioc)); 785 bzero(xaioc, sizeof(*xaioc)); 786 snprintf(xaioc->cl_label, sizeof(xaioc->cl_label), 787 "%s", xdisk->cl_label); 788 snprintf(xaioc->fs_label, sizeof(xaioc->fs_label), 789 "X-%s", xdisk->fs_label); 790 xaioc->bytes = xdisk->block.bytes; 791 xaioc->blksize = xdisk->block.blksize; 792 xaioc->fd = pipefds[0]; 793 794 pthread_create(&thread, NULL, xdisk_attach_tmpthread, xaioc); 795 } 796 797 static 798 void * 799 xdisk_attach_tmpthread(void *data) 800 { 801 struct xdisk_attach_ioctl *xaioc = data; 802 int fd; 803 804 pthread_detach(pthread_self()); 805 806 fd = open("/dev/xdisk", O_RDWR, 0600); 807 if (fd < 0) { 808 fprintf(stderr, "xdisk_reconnect: Unable to open /dev/xdisk\n"); 809 } 810 if (ioctl(fd, XDISKIOCATTACH, xaioc) < 0) { 811 fprintf(stderr, "reconnect %s: xdisk attach failed\n", 812 xaioc->cl_label); 813 } 814 close(xaioc->fd); 815 close(fd); 816 return (NULL); 817 } 818 819 static 820 void 821 xdisk_disconnect(void *handle) 822 { 823 struct service_node_opaque *info = handle; 824 825 assert(info->servicing == 1); 826 827 pthread_mutex_lock(&diskmtx); 828 info->servicing = 0; 829 if (info->attached == 0) 830 free(info); 831 pthread_mutex_unlock(&diskmtx); 832 } 833