1 /* 2 * Copyright (c) 2011-2012 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@dragonflybsd.org> 6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org> 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in 16 * the documentation and/or other materials provided with the 17 * distribution. 18 * 3. Neither the name of The DragonFly Project nor the names of its 19 * contributors may be used to endorse or promote products derived 20 * from this software without specific, prior written permission. 21 * 22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 33 * SUCH DAMAGE. 34 */ 35 36 #include "hammer2.h" 37 38 #include <sys/xdiskioctl.h> 39 #include <machine/atomic.h> 40 41 struct hammer2_media_config { 42 hammer2_volconf_t copy_run; 43 hammer2_volconf_t copy_pend; 44 pthread_t thread; 45 pthread_cond_t cond; 46 int ctl; 47 int fd; 48 int pipefd[2]; /* signal stop */ 49 dmsg_iocom_t iocom; 50 pthread_t iocom_thread; 51 enum { H2MC_STOPPED, H2MC_CONNECT, H2MC_RUNNING } state; 52 }; 53 54 typedef struct hammer2_media_config hammer2_media_config_t; 55 56 #define H2CONFCTL_STOP 0x00000001 57 #define H2CONFCTL_UPDATE 0x00000002 58 59 struct diskcon { 60 TAILQ_ENTRY(diskcon) entry; 61 char *disk; 62 }; 63 64 struct service_node_opaque { 65 char cl_label[64]; 66 char fs_label[64]; 67 dmsg_media_block_t block; 68 int attached; 69 int servicing; 70 int servicefd; 71 }; 72 73 struct autoconn { 74 TAILQ_ENTRY(autoconn) entry; 75 char *host; 76 int stage; 77 int stopme; 78 int pipefd[2]; /* {read,write} */ 79 enum { AUTOCONN_INACTIVE, AUTOCONN_ACTIVE } state; 80 pthread_t thread; 81 }; 82 83 #define WS " \r\n" 84 85 TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq); 86 static pthread_mutex_t diskmtx; 87 static pthread_mutex_t confmtx; 88 89 static void *service_thread(void *data); 90 static void *udev_thread(void *data); 91 static void *autoconn_thread(void *data); 92 static void master_reconnect(const char *mntpt); 93 static void disk_reconnect(const char *disk); 94 static void disk_disconnect(void *handle); 95 static void udev_check_disks(void); 96 static void hammer2_usrmsg_handler(dmsg_msg_t *msg, int unmanaged); 97 static void *hammer2_volconf_thread(void *info); 98 static void hammer2_volconf_signal(dmsg_iocom_t *iocom); 99 static void hammer2_volconf_start(hammer2_media_config_t *conf, 100 const char *hostname); 101 static void hammer2_volconf_stop(hammer2_media_config_t *conf); 102 103 104 static void xdisk_connect(void); 105 106 /* 107 * Start-up the master listener daemon for the machine. This daemon runs 108 * a UDP discovery protocol, a TCP rendezvous, and scans certain files 109 * and directories for work. 110 * 111 * -- 112 * 113 * The only purpose for the UDP discovery protocol is to determine what 114 * other IPs on the LAN are running the hammer2 service daemon. DNS is not 115 * required to operate, but hostnames (if assigned) must be unique. If 116 * no hostname is assigned the host's IP is used as the name. This name 117 * is broadcast along with the mtime of the originator's private key. 118 * 119 * Receiving hammer2 service daemons which are able to match the label against 120 * /etc/hammer2/remote/<label>.pub will initiate a persistent connection 121 * to the target. Removal of the file will cause a disconnection. A failed 122 * public key negotiation stops further connection attempts until either the 123 * file is updated or the remote mtime is updated. 124 * 125 * Generally speaking this results in a web of connections, typically a 126 * combination of point-to-point for the more important links and relayed 127 * (spanning tree) for less important or filtered links. 128 * 129 * -- 130 * 131 * The TCP listener serves as a rendezvous point in the cluster, accepting 132 * connections, performing registrations and authentications, maintaining 133 * the spanning tree, and keeping track of message state so disconnects can 134 * be handled properly. 135 * 136 * Once authenticated only low-level messaging protocols (which includes 137 * tracking persistent messages) are handled by this daemon. This daemon 138 * does not run the higher level quorum or locking protocols. 139 * 140 * -- 141 * 142 * The file /etc/hammer2/autoconn, if it exists, contains a list of targets 143 * to connect to (which do not have to be on the local lan). This list will 144 * be retried until a connection can be established. The file is not usually 145 * needed for linkages local to the LAN. 146 */ 147 int 148 cmd_service(void) 149 { 150 struct sockaddr_in lsin; 151 int on; 152 int lfd; 153 154 /* 155 * Acquire socket and set options 156 */ 157 if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { 158 fprintf(stderr, "master_listen: socket(): %s\n", 159 strerror(errno)); 160 return 1; 161 } 162 on = 1; 163 setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); 164 165 /* 166 * Setup listen port and try to bind. If the bind fails we assume 167 * that a master listener process is already running and silently 168 * fail. 169 */ 170 bzero(&lsin, sizeof(lsin)); 171 lsin.sin_family = AF_INET; 172 lsin.sin_addr.s_addr = INADDR_ANY; 173 lsin.sin_port = htons(DMSG_LISTEN_PORT); 174 if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) { 175 close(lfd); 176 if (QuietOpt == 0) { 177 fprintf(stderr, 178 "master listen: daemon already running\n"); 179 } 180 return 0; 181 } 182 if (QuietOpt == 0) 183 fprintf(stderr, "master listen: startup\n"); 184 listen(lfd, 50); 185 186 /* 187 * Fork and disconnect the controlling terminal and parent process, 188 * executing the specified function as a pthread. 189 * 190 * Returns to the original process which can then continue running. 191 * In debug mode this call will create the pthread without forking 192 * and set NormalExit to 0, instead of fork. 193 */ 194 hammer2_demon(service_thread, (void *)(intptr_t)lfd); 195 if (NormalExit) 196 close(lfd); 197 return 0; 198 } 199 200 /* 201 * Master listen/accept thread. Accept connections on the master socket, 202 * starting a pthread for each one. 203 */ 204 static 205 void * 206 service_thread(void *data) 207 { 208 struct sockaddr_in asin; 209 socklen_t alen; 210 pthread_t thread; 211 dmsg_master_service_info_t *info; 212 int lfd = (int)(intptr_t)data; 213 int fd; 214 int i; 215 int count; 216 int opt; 217 struct statfs *mntbuf = NULL; 218 struct statvfs *mntvbuf = NULL; 219 220 /* 221 * Nobody waits for us 222 */ 223 setproctitle("hammer2 master listen"); 224 pthread_detach(pthread_self()); 225 226 /* 227 * Start up a thread to handle block device monitoring for 228 * export to the cluster. 229 */ 230 thread = NULL; 231 pthread_create(&thread, NULL, udev_thread, NULL); 232 233 /* 234 * Start up a thread to tie /dev/xdisk into the cluster 235 * controller. 236 */ 237 xdisk_connect(); 238 239 /* 240 * Start thread to manage /etc/hammer2/autoconn 241 */ 242 thread = NULL; 243 pthread_create(&thread, NULL, autoconn_thread, NULL); 244 245 /* 246 * Scan existing hammer2 mounts and reconnect to them using 247 * HAMMER2IOC_RECLUSTER. 248 */ 249 count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT); 250 for (i = 0; i < count; ++i) { 251 if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0) 252 master_reconnect(mntbuf[i].f_mntonname); 253 } 254 255 /* 256 * Accept connections and create pthreads to handle them after 257 * validating the IP. 258 */ 259 for (;;) { 260 alen = sizeof(asin); 261 fd = accept(lfd, (struct sockaddr *)&asin, &alen); 262 if (fd < 0) { 263 if (errno == EINTR) 264 continue; 265 break; 266 } 267 opt = 1; 268 setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof opt); 269 thread = NULL; 270 fprintf(stderr, "service_thread: accept fd %d\n", fd); 271 info = malloc(sizeof(*info)); 272 bzero(info, sizeof(*info)); 273 info->fd = fd; 274 info->detachme = 1; 275 info->usrmsg_callback = hammer2_usrmsg_handler; 276 info->label = strdup("client"); 277 pthread_create(&thread, NULL, dmsg_master_service, info); 278 } 279 return (NULL); 280 } 281 282 /* 283 * Handle/Monitor the dmsg stream. If unmanaged is set we are responsible 284 * for responding for the message, otherwise if it is not set libdmsg has 285 * already done some preprocessing and will respond to the message for us 286 * when we return. 287 * 288 * We primarily monitor for VOLCONFs 289 */ 290 static 291 void 292 hammer2_usrmsg_handler(dmsg_msg_t *msg, int unmanaged) 293 { 294 dmsg_state_t *state; 295 hammer2_media_config_t *conf; 296 dmsg_lnk_hammer2_volconf_t *msgconf; 297 int i; 298 299 /* 300 * Only process messages which are part of a LNK_CONN stream 301 */ 302 state = msg->state; 303 if (state == NULL || 304 (state->rxcmd & DMSGF_BASECMDMASK) != DMSG_LNK_CONN) { 305 hammer2_shell_parse(msg, unmanaged); 306 return; 307 } 308 309 switch(msg->tcmd) { 310 case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE: 311 case DMSG_LNK_CONN | DMSGF_DELETE: 312 case DMSG_LNK_ERROR | DMSGF_DELETE: 313 /* 314 * Deleting connection, clean out all volume configs 315 */ 316 if (state->media == NULL || state->media->usrhandle == NULL) 317 break; 318 conf = state->media->usrhandle; 319 fprintf(stderr, "Shutting down media spans\n"); 320 for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) { 321 if (conf[i].thread) { 322 conf[i].ctl = H2CONFCTL_STOP; 323 pthread_cond_signal(&conf[i].cond); 324 } 325 } 326 for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) { 327 if (conf[i].thread) { 328 pthread_join(conf[i].thread, NULL); 329 conf->thread = NULL; 330 pthread_cond_destroy(&conf[i].cond); 331 } 332 } 333 state->media->usrhandle = NULL; 334 free(conf); 335 break; 336 case DMSG_LNK_HAMMER2_VOLCONF: 337 /* 338 * One-way volume-configuration message is transmitted 339 * over the open LNK_CONN transaction. 340 */ 341 fprintf(stderr, "RECEIVED VOLCONF\n"); 342 343 if ((conf = state->media->usrhandle) == NULL) { 344 conf = malloc(sizeof(*conf) * HAMMER2_COPYID_COUNT); 345 bzero(conf, sizeof(*conf) * HAMMER2_COPYID_COUNT); 346 state->media->usrhandle = conf; 347 } 348 msgconf = H2_LNK_VOLCONF(msg); 349 350 if (msgconf->index < 0 || 351 msgconf->index >= HAMMER2_COPYID_COUNT) { 352 fprintf(stderr, 353 "VOLCONF: ILLEGAL INDEX %d\n", 354 msgconf->index); 355 break; 356 } 357 if (msgconf->copy.path[sizeof(msgconf->copy.path) - 1] != 0 || 358 msgconf->copy.path[0] == 0) { 359 fprintf(stderr, 360 "VOLCONF: ILLEGAL PATH %d\n", 361 msgconf->index); 362 break; 363 } 364 conf += msgconf->index; 365 pthread_mutex_lock(&confmtx); 366 conf->copy_pend = msgconf->copy; 367 conf->ctl |= H2CONFCTL_UPDATE; 368 pthread_mutex_unlock(&confmtx); 369 if (conf->thread == NULL) { 370 fprintf(stderr, "VOLCONF THREAD STARTED\n"); 371 pthread_cond_init(&conf->cond, NULL); 372 pthread_create(&conf->thread, NULL, 373 hammer2_volconf_thread, (void *)conf); 374 } 375 pthread_cond_signal(&conf->cond); 376 break; 377 default: 378 if (unmanaged) 379 dmsg_msg_reply(msg, DMSG_ERR_NOSUPP); 380 break; 381 } 382 } 383 384 static void * 385 hammer2_volconf_thread(void *info) 386 { 387 hammer2_media_config_t *conf = info; 388 389 setproctitle("hammer2 volconf"); 390 391 pthread_mutex_lock(&confmtx); 392 while ((conf->ctl & H2CONFCTL_STOP) == 0) { 393 if (conf->ctl & H2CONFCTL_UPDATE) { 394 fprintf(stderr, "VOLCONF UPDATE\n"); 395 conf->ctl &= ~H2CONFCTL_UPDATE; 396 if (bcmp(&conf->copy_run, &conf->copy_pend, 397 sizeof(conf->copy_run)) == 0) { 398 fprintf(stderr, "VOLCONF: no changes\n"); 399 continue; 400 } 401 /* 402 * XXX TODO - auto reconnect on lookup failure or 403 * connect failure or stream failure. 404 */ 405 406 pthread_mutex_unlock(&confmtx); 407 hammer2_volconf_stop(conf); 408 conf->copy_run = conf->copy_pend; 409 if (conf->copy_run.copyid != 0 && 410 strncmp(conf->copy_run.path, "span:", 5) == 0) { 411 hammer2_volconf_start(conf, 412 conf->copy_run.path + 5); 413 } 414 pthread_mutex_lock(&confmtx); 415 fprintf(stderr, "VOLCONF UPDATE DONE state %d\n", conf->state); 416 } 417 if (conf->state == H2MC_CONNECT) { 418 hammer2_volconf_start(conf, conf->copy_run.path + 5); 419 pthread_mutex_unlock(&confmtx); 420 sleep(5); 421 pthread_mutex_lock(&confmtx); 422 } else { 423 pthread_cond_wait(&conf->cond, &confmtx); 424 } 425 } 426 pthread_mutex_unlock(&confmtx); 427 hammer2_volconf_stop(conf); 428 return(NULL); 429 } 430 431 static 432 void 433 hammer2_volconf_start(hammer2_media_config_t *conf, const char *hostname) 434 { 435 dmsg_master_service_info_t *info; 436 437 switch(conf->state) { 438 case H2MC_STOPPED: 439 case H2MC_CONNECT: 440 conf->fd = dmsg_connect(hostname); 441 if (conf->fd < 0) { 442 fprintf(stderr, "Unable to connect to %s\n", hostname); 443 conf->state = H2MC_CONNECT; 444 } else if (pipe(conf->pipefd) < 0) { 445 close(conf->fd); 446 fprintf(stderr, "pipe() failed during volconf\n"); 447 conf->state = H2MC_CONNECT; 448 } else { 449 fprintf(stderr, "VOLCONF CONNECT\n"); 450 info = malloc(sizeof(*info)); 451 bzero(info, sizeof(*info)); 452 info->fd = conf->fd; 453 info->altfd = conf->pipefd[0]; 454 info->altmsg_callback = hammer2_volconf_signal; 455 info->usrmsg_callback = hammer2_usrmsg_handler; 456 info->detachme = 0; 457 conf->state = H2MC_RUNNING; 458 pthread_create(&conf->iocom_thread, NULL, 459 dmsg_master_service, info); 460 } 461 break; 462 case H2MC_RUNNING: 463 break; 464 } 465 } 466 467 static 468 void 469 hammer2_volconf_stop(hammer2_media_config_t *conf) 470 { 471 switch(conf->state) { 472 case H2MC_STOPPED: 473 break; 474 case H2MC_CONNECT: 475 conf->state = H2MC_STOPPED; 476 break; 477 case H2MC_RUNNING: 478 close(conf->pipefd[1]); 479 conf->pipefd[1] = -1; 480 pthread_join(conf->iocom_thread, NULL); 481 conf->iocom_thread = NULL; 482 conf->state = H2MC_STOPPED; 483 break; 484 } 485 } 486 487 static 488 void 489 hammer2_volconf_signal(dmsg_iocom_t *iocom) 490 { 491 atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF); 492 } 493 494 /* 495 * Monitor block devices. Currently polls every ~10 seconds or so. 496 */ 497 static 498 void * 499 udev_thread(void *data __unused) 500 { 501 int fd; 502 int seq = 0; 503 504 pthread_detach(pthread_self()); 505 setproctitle("hammer2 udev_thread"); 506 507 if ((fd = open(UDEV_DEVICE_PATH, O_RDWR)) < 0) { 508 fprintf(stderr, "udev_thread: unable to open \"%s\"\n", 509 UDEV_DEVICE_PATH); 510 pthread_exit(NULL); 511 } 512 udev_check_disks(); 513 while (ioctl(fd, UDEVWAIT, &seq) == 0) { 514 udev_check_disks(); 515 sleep(1); 516 } 517 return (NULL); 518 } 519 520 static void *autoconn_connect_thread(void *data); 521 static void autoconn_disconnect_signal(dmsg_iocom_t *iocom); 522 523 static 524 void * 525 autoconn_thread(void *data __unused) 526 { 527 TAILQ_HEAD(, autoconn) autolist; 528 struct autoconn *ac; 529 struct autoconn *next; 530 pthread_t thread; 531 struct stat st; 532 time_t t; 533 time_t lmod; 534 int found_last; 535 FILE *fp; 536 char buf[256]; 537 538 TAILQ_INIT(&autolist); 539 found_last = 0; 540 lmod = 0; 541 542 pthread_detach(pthread_self()); 543 setproctitle("hammer2 autoconn_thread"); 544 for (;;) { 545 /* 546 * Polling interval 547 */ 548 sleep(5); 549 550 /* 551 * Poll the file. Loop up if the synchronized state (lmod) 552 * has not changed. 553 */ 554 if (stat(HAMMER2_DEFAULT_DIR "/autoconn", &st) == 0) { 555 if (lmod == st.st_mtime) 556 continue; 557 fp = fopen(HAMMER2_DEFAULT_DIR "/autoconn", "r"); 558 if (fp == NULL) 559 continue; 560 } else { 561 if (lmod == 0) 562 continue; 563 fp = NULL; 564 } 565 566 /* 567 * Wait at least 5 seconds after the file is created or 568 * removed. 569 * 570 * Do not update the synchronized state. 571 */ 572 if (fp == NULL && found_last) { 573 found_last = 0; 574 continue; 575 } else if (fp && found_last == 0) { 576 fclose(fp); 577 found_last = 1; 578 continue; 579 } 580 581 /* 582 * Don't scan the file until the time progresses past the 583 * file's mtime, so we can validate that the file was not 584 * further modified during our scan. 585 * 586 * Do not update the synchronized state. 587 */ 588 time(&t); 589 if (fp) { 590 if (t == st.st_mtime) { 591 fclose(fp); 592 continue; 593 } 594 t = st.st_mtime; 595 } else { 596 t = 0; 597 } 598 599 /* 600 * Set staging to disconnect, then scan the file. 601 */ 602 TAILQ_FOREACH(ac, &autolist, entry) 603 ac->stage = 0; 604 while (fp && fgets(buf, sizeof(buf), fp) != NULL) { 605 char *host; 606 607 if ((host = strtok(buf, " \t\r\n")) == NULL || 608 host[0] == '#') { 609 continue; 610 } 611 TAILQ_FOREACH(ac, &autolist, entry) { 612 if (strcmp(host, ac->host) == 0) 613 break; 614 } 615 if (ac == NULL) { 616 ac = malloc(sizeof(*ac)); 617 bzero(ac, sizeof(*ac)); 618 ac->host = strdup(host); 619 ac->state = AUTOCONN_INACTIVE; 620 TAILQ_INSERT_TAIL(&autolist, ac, entry); 621 } 622 ac->stage = 1; 623 } 624 625 /* 626 * Ignore the scan (and retry again) if the file was 627 * modified during the scan. 628 * 629 * Do not update the synchronized state. 630 */ 631 if (fp) { 632 if (fstat(fileno(fp), &st) < 0) { 633 fclose(fp); 634 continue; 635 } 636 fclose(fp); 637 if (t != st.st_mtime) 638 continue; 639 } 640 641 /* 642 * Update the synchronized state and reconfigure the 643 * connect list as needed. 644 */ 645 lmod = t; 646 next = TAILQ_FIRST(&autolist); 647 while ((ac = next) != NULL) { 648 next = TAILQ_NEXT(ac, entry); 649 650 /* 651 * Staging, initiate 652 */ 653 if (ac->stage && ac->state == AUTOCONN_INACTIVE) { 654 if (pipe(ac->pipefd) == 0) { 655 ac->stopme = 0; 656 ac->state = AUTOCONN_ACTIVE; 657 thread = NULL; 658 pthread_create(&thread, NULL, 659 autoconn_connect_thread, 660 ac); 661 } 662 } 663 664 /* 665 * Unstaging, stop active connection. 666 * 667 * We write to the pipe which causes the iocom_core 668 * to call autoconn_disconnect_signal(). 669 */ 670 if (ac->stage == 0 && 671 ac->state == AUTOCONN_ACTIVE) { 672 if (ac->stopme == 0) { 673 char dummy = 0; 674 ac->stopme = 1; 675 write(ac->pipefd[1], &dummy, 1); 676 } 677 } 678 679 /* 680 * Unstaging, delete inactive connection. 681 */ 682 if (ac->stage == 0 && 683 ac->state == AUTOCONN_INACTIVE) { 684 TAILQ_REMOVE(&autolist, ac, entry); 685 free(ac->host); 686 free(ac); 687 continue; 688 } 689 } 690 sleep(5); 691 } 692 return(NULL); 693 } 694 695 static 696 void * 697 autoconn_connect_thread(void *data) 698 { 699 dmsg_master_service_info_t *info; 700 struct autoconn *ac; 701 void *res; 702 int fd; 703 704 ac = data; 705 pthread_detach(pthread_self()); 706 setproctitle("hammer2 dmsg"); 707 708 while (ac->stopme == 0) { 709 fd = dmsg_connect(ac->host); 710 if (fd < 0) { 711 if (DMsgDebugOpt > 2) { 712 fprintf(stderr, 713 "autoconn: Connect failure: %s\n", 714 ac->host); 715 } 716 sleep(5); 717 continue; 718 } 719 fprintf(stderr, "autoconn: Connect %s\n", ac->host); 720 721 info = malloc(sizeof(*info)); 722 bzero(info, sizeof(*info)); 723 info->fd = fd; 724 info->altfd = ac->pipefd[0]; 725 info->altmsg_callback = autoconn_disconnect_signal; 726 info->usrmsg_callback = hammer2_usrmsg_handler; 727 info->detachme = 0; 728 info->noclosealt = 1; 729 pthread_create(&ac->thread, NULL, dmsg_master_service, info); 730 pthread_join(ac->thread, &res); 731 } 732 close(ac->pipefd[0]); 733 ac->state = AUTOCONN_INACTIVE; 734 /* auto structure can be ripped out here */ 735 return(NULL); 736 } 737 738 static 739 void 740 autoconn_disconnect_signal(dmsg_iocom_t *iocom) 741 { 742 fprintf(stderr, "autoconn: Shutting down socket\n"); 743 atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF); 744 } 745 746 /* 747 * Retrieve the list of disk attachments and attempt to export 748 * them. 749 */ 750 static 751 void 752 udev_check_disks(void) 753 { 754 char tmpbuf[1024]; 755 char *buf = NULL; 756 char *disk; 757 int error; 758 size_t n; 759 760 for (;;) { 761 n = 0; 762 error = sysctlbyname("kern.disks", NULL, &n, NULL, 0); 763 if (error < 0 || n == 0) 764 break; 765 if (n >= sizeof(tmpbuf)) 766 buf = malloc(n + 1); 767 else 768 buf = tmpbuf; 769 error = sysctlbyname("kern.disks", buf, &n, NULL, 0); 770 if (error == 0) { 771 buf[n] = 0; 772 break; 773 } 774 if (buf != tmpbuf) { 775 free(buf); 776 buf = NULL; 777 } 778 if (errno != ENOMEM) 779 break; 780 } 781 if (buf) { 782 fprintf(stderr, "DISKS: %s\n", buf); 783 for (disk = strtok(buf, WS); disk; disk = strtok(NULL, WS)) { 784 disk_reconnect(disk); 785 } 786 if (buf != tmpbuf) 787 free(buf); 788 } 789 } 790 791 /* 792 * Normally the mount program supplies a cluster communications 793 * descriptor to the hammer2 vfs on mount, but if you kill the service 794 * daemon and restart it that link will be lost. 795 * 796 * This procedure attempts to [re]connect to existing mounts when 797 * the service daemon is started up before going into its accept 798 * loop. 799 * 800 * NOTE: A hammer2 mount point can only accomodate one connection at a time 801 * so this will disconnect any existing connection during the 802 * reconnect. 803 */ 804 static 805 void 806 master_reconnect(const char *mntpt) 807 { 808 struct hammer2_ioc_recluster recls; 809 dmsg_master_service_info_t *info; 810 pthread_t thread; 811 int fd; 812 int pipefds[2]; 813 814 fd = open(mntpt, O_RDONLY); 815 if (fd < 0) { 816 fprintf(stderr, "reconnect %s: no access to mount\n", mntpt); 817 return; 818 } 819 if (pipe(pipefds) < 0) { 820 fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt); 821 close(fd); 822 return; 823 } 824 bzero(&recls, sizeof(recls)); 825 recls.fd = pipefds[0]; 826 if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) { 827 fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt); 828 close(pipefds[0]); 829 close(pipefds[1]); 830 close(fd); 831 return; 832 } 833 close(pipefds[0]); 834 close(fd); 835 836 info = malloc(sizeof(*info)); 837 bzero(info, sizeof(*info)); 838 info->fd = pipefds[1]; 839 info->detachme = 1; 840 info->usrmsg_callback = hammer2_usrmsg_handler; 841 info->label = strdup("hammer2"); 842 pthread_create(&thread, NULL, dmsg_master_service, info); 843 } 844 845 /* 846 * Reconnect a physical disk service to the mesh. 847 */ 848 static 849 void 850 disk_reconnect(const char *disk) 851 { 852 struct disk_ioc_recluster recls; 853 struct diskcon *dc; 854 dmsg_master_service_info_t *info; 855 pthread_t thread; 856 int fd; 857 int pipefds[2]; 858 char *path; 859 860 /* 861 * Urm, this will auto-create mdX+1, just ignore for now. 862 * This mechanic needs to be fixed. It might actually be nice 863 * to be able to export md disks. 864 */ 865 if (strncmp(disk, "md", 2) == 0) 866 return; 867 if (strncmp(disk, "xa", 2) == 0) 868 return; 869 870 /* 871 * Check if already connected 872 */ 873 pthread_mutex_lock(&diskmtx); 874 TAILQ_FOREACH(dc, &diskconq, entry) { 875 if (strcmp(dc->disk, disk) == 0) 876 break; 877 } 878 pthread_mutex_unlock(&diskmtx); 879 if (dc) 880 return; 881 882 /* 883 * Not already connected, create a connection to the kernel 884 * disk driver. 885 */ 886 asprintf(&path, "/dev/%s", disk); 887 fd = open(path, O_RDONLY); 888 if (fd < 0) { 889 fprintf(stderr, "reconnect %s: no access to disk\n", disk); 890 free(path); 891 return; 892 } 893 free(path); 894 if (pipe(pipefds) < 0) { 895 fprintf(stderr, "reconnect %s: pipe() failed\n", disk); 896 close(fd); 897 return; 898 } 899 bzero(&recls, sizeof(recls)); 900 recls.fd = pipefds[0]; 901 if (ioctl(fd, DIOCRECLUSTER, &recls) < 0) { 902 fprintf(stderr, "reconnect %s: ioctl failed\n", disk); 903 close(pipefds[0]); 904 close(pipefds[1]); 905 close(fd); 906 return; 907 } 908 close(pipefds[0]); 909 close(fd); 910 911 dc = malloc(sizeof(*dc)); 912 dc->disk = strdup(disk); 913 pthread_mutex_lock(&diskmtx); 914 TAILQ_INSERT_TAIL(&diskconq, dc, entry); 915 pthread_mutex_unlock(&diskmtx); 916 917 info = malloc(sizeof(*info)); 918 bzero(info, sizeof(*info)); 919 info->fd = pipefds[1]; 920 info->detachme = 1; 921 info->usrmsg_callback = hammer2_usrmsg_handler; 922 info->exit_callback = disk_disconnect; 923 info->handle = dc; 924 info->label = strdup(dc->disk); 925 pthread_create(&thread, NULL, dmsg_master_service, info); 926 } 927 928 static 929 void 930 disk_disconnect(void *handle) 931 { 932 struct diskcon *dc = handle; 933 934 fprintf(stderr, "DISK_DISCONNECT %s\n", dc->disk); 935 936 pthread_mutex_lock(&diskmtx); 937 TAILQ_REMOVE(&diskconq, dc, entry); 938 pthread_mutex_unlock(&diskmtx); 939 free(dc->disk); 940 free(dc); 941 } 942 943 /* 944 * Connect our cluster controller to /dev/xdisk. xdisk will pick up 945 * SPAN messages that we route to it, makes remote block devices 946 * available to the host, and can issue dmsg transactions based on 947 * device requests. 948 */ 949 static 950 void 951 xdisk_connect(void) 952 { 953 dmsg_master_service_info_t *info; 954 struct xdisk_attach_ioctl xaioc; 955 pthread_t thread; 956 int pipefds[2]; 957 int xfd; 958 int error; 959 960 /* 961 * Is /dev/xdisk available? 962 */ 963 xfd = open("/dev/xdisk", O_RDWR, 0600); 964 if (xfd < 0) { 965 fprintf(stderr, "xdisk_connect: Unable to open /dev/xdisk\n"); 966 return; 967 } 968 969 if (pipe(pipefds) < 0) { 970 fprintf(stderr, "xdisk_connect: pipe() failed\n"); 971 return; 972 } 973 974 /* 975 * Pipe between cluster controller (this user process). 976 */ 977 info = malloc(sizeof(*info)); 978 bzero(info, sizeof(*info)); 979 info->fd = pipefds[1]; 980 info->detachme = 1; 981 info->usrmsg_callback = hammer2_usrmsg_handler; 982 info->exit_callback = NULL; 983 pthread_create(&thread, NULL, dmsg_master_service, info); 984 985 /* 986 * And the xdisk device. 987 */ 988 bzero(&xaioc, sizeof(xaioc)); 989 xaioc.fd = pipefds[0]; 990 error = ioctl(xfd, XDISKIOCATTACH, &xaioc); 991 close(pipefds[0]); 992 close(xfd); 993 994 if (error < 0) { 995 fprintf(stderr, 996 "xdisk_connect: cannot attach %s\n", 997 strerror(errno)); 998 return; 999 } 1000 } 1001