1 /* 2 * Copyright (c) 2011-2012 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@dragonflybsd.org> 6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org> 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in 16 * the documentation and/or other materials provided with the 17 * distribution. 18 * 3. Neither the name of The DragonFly Project nor the names of its 19 * contributors may be used to endorse or promote products derived 20 * from this software without specific, prior written permission. 21 * 22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 33 * SUCH DAMAGE. 34 */ 35 36 #include "hammer2.h" 37 38 #include <sys/xdiskioctl.h> 39 #include <machine/atomic.h> 40 41 struct hammer2_media_config { 42 hammer2_volconf_t copy_run; 43 hammer2_volconf_t copy_pend; 44 pthread_t thread; 45 int thread_started; 46 pthread_cond_t cond; 47 int ctl; 48 int fd; 49 int pipefd[2]; /* signal stop */ 50 dmsg_iocom_t iocom; 51 pthread_t iocom_thread; 52 enum { H2MC_STOPPED, H2MC_CONNECT, H2MC_RUNNING } state; 53 }; 54 55 typedef struct hammer2_media_config hammer2_media_config_t; 56 57 #define H2CONFCTL_STOP 0x00000001 58 #define H2CONFCTL_UPDATE 0x00000002 59 60 struct diskcon { 61 TAILQ_ENTRY(diskcon) entry; 62 char *disk; 63 }; 64 65 struct service_node_opaque { 66 char cl_label[64]; 67 char fs_label[64]; 68 dmsg_media_block_t block; 69 int attached; 70 int servicing; 71 int servicefd; 72 }; 73 74 struct autoconn { 75 TAILQ_ENTRY(autoconn) entry; 76 char *host; 77 int stage; 78 int stopme; 79 int pipefd[2]; /* {read,write} */ 80 enum { AUTOCONN_INACTIVE, AUTOCONN_ACTIVE } state; 81 pthread_t thread; 82 }; 83 84 #define WS " \r\n" 85 86 static TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq); 87 static pthread_mutex_t diskmtx; 88 static pthread_mutex_t confmtx; 89 90 static void *service_thread(void *data); 91 static void *udev_thread(void *data); 92 static void *autoconn_thread(void *data); 93 static void master_reconnect(const char *mntpt); 94 static void disk_reconnect(const char *disk); 95 static void disk_disconnect(void *handle); 96 static void udev_check_disks(void); 97 static void hammer2_usrmsg_handler(dmsg_msg_t *msg, int unmanaged); 98 static void *hammer2_volconf_thread(void *info); 99 static void hammer2_volconf_signal(dmsg_iocom_t *iocom); 100 static void hammer2_volconf_start(hammer2_media_config_t *conf, 101 const char *hostname); 102 static void hammer2_volconf_stop(hammer2_media_config_t *conf); 103 104 105 static void xdisk_connect(void); 106 107 /* 108 * Start-up the master listener daemon for the machine. This daemon runs 109 * a UDP discovery protocol, a TCP rendezvous, and scans certain files 110 * and directories for work. 111 * 112 * -- 113 * 114 * The only purpose for the UDP discovery protocol is to determine what 115 * other IPs on the LAN are running the hammer2 service daemon. DNS is not 116 * required to operate, but hostnames (if assigned) must be unique. If 117 * no hostname is assigned the host's IP is used as the name. This name 118 * is broadcast along with the mtime of the originator's private key. 119 * 120 * Receiving hammer2 service daemons which are able to match the label against 121 * /etc/hammer2/remote/<label>.pub will initiate a persistent connection 122 * to the target. Removal of the file will cause a disconnection. A failed 123 * public key negotiation stops further connection attempts until either the 124 * file is updated or the remote mtime is updated. 125 * 126 * Generally speaking this results in a web of connections, typically a 127 * combination of point-to-point for the more important links and relayed 128 * (spanning tree) for less important or filtered links. 129 * 130 * -- 131 * 132 * The TCP listener serves as a rendezvous point in the cluster, accepting 133 * connections, performing registrations and authentications, maintaining 134 * the spanning tree, and keeping track of message state so disconnects can 135 * be handled properly. 136 * 137 * Once authenticated only low-level messaging protocols (which includes 138 * tracking persistent messages) are handled by this daemon. This daemon 139 * does not run the higher level quorum or locking protocols. 140 * 141 * -- 142 * 143 * The file /etc/hammer2/autoconn, if it exists, contains a list of targets 144 * to connect to (which do not have to be on the local lan). This list will 145 * be retried until a connection can be established. The file is not usually 146 * needed for linkages local to the LAN. 147 */ 148 int 149 cmd_service(void) 150 { 151 struct sockaddr_in lsin; 152 int on; 153 int lfd; 154 155 /* 156 * Acquire socket and set options 157 */ 158 if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { 159 fprintf(stderr, "master_listen: socket(): %s\n", 160 strerror(errno)); 161 return 1; 162 } 163 on = 1; 164 setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); 165 166 /* 167 * Setup listen port and try to bind. If the bind fails we assume 168 * that a master listener process is already running and silently 169 * fail. 170 */ 171 bzero(&lsin, sizeof(lsin)); 172 lsin.sin_family = AF_INET; 173 lsin.sin_addr.s_addr = INADDR_ANY; 174 lsin.sin_port = htons(DMSG_LISTEN_PORT); 175 if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) { 176 close(lfd); 177 if (QuietOpt == 0) { 178 fprintf(stderr, 179 "master listen: daemon already running\n"); 180 } 181 return 0; 182 } 183 if (QuietOpt == 0) 184 fprintf(stderr, "master listen: startup\n"); 185 listen(lfd, 50); 186 187 /* 188 * Fork and disconnect the controlling terminal and parent process, 189 * executing the specified function as a pthread. 190 * 191 * Returns to the original process which can then continue running. 192 * In debug mode this call will create the pthread without forking 193 * and set NormalExit to 0, instead of fork. 194 */ 195 hammer2_demon(service_thread, (void *)(intptr_t)lfd); 196 if (NormalExit) 197 close(lfd); 198 return 0; 199 } 200 201 /* 202 * Master listen/accept thread. Accept connections on the master socket, 203 * starting a pthread for each one. 204 */ 205 static 206 void * 207 service_thread(void *data) 208 { 209 struct sockaddr_in asin; 210 socklen_t alen; 211 pthread_t thread; 212 dmsg_master_service_info_t *info; 213 int lfd = (int)(intptr_t)data; 214 int fd; 215 int i; 216 int count; 217 int opt; 218 struct statfs *mntbuf = NULL; 219 struct statvfs *mntvbuf = NULL; 220 221 /* 222 * Nobody waits for us 223 */ 224 setproctitle("hammer2 master listen"); 225 pthread_detach(pthread_self()); 226 227 /* 228 * Start up a thread to handle block device monitoring for 229 * export to the cluster. 230 */ 231 pthread_create(&thread, NULL, udev_thread, NULL); 232 233 /* 234 * Start up a thread to tie /dev/xdisk into the cluster 235 * controller. 236 */ 237 xdisk_connect(); 238 239 /* 240 * Start thread to manage /etc/hammer2/autoconn 241 */ 242 pthread_create(&thread, NULL, autoconn_thread, NULL); 243 244 /* 245 * Scan existing hammer2 mounts and reconnect to them using 246 * HAMMER2IOC_RECLUSTER. 247 */ 248 count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT); 249 for (i = 0; i < count; ++i) { 250 if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0) 251 master_reconnect(mntbuf[i].f_mntonname); 252 } 253 254 /* 255 * Accept connections and create pthreads to handle them after 256 * validating the IP. 257 */ 258 for (;;) { 259 alen = sizeof(asin); 260 fd = accept(lfd, (struct sockaddr *)&asin, &alen); 261 if (fd < 0) { 262 if (errno == EINTR) 263 continue; 264 break; 265 } 266 opt = 1; 267 setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof opt); 268 fprintf(stderr, "service_thread: accept fd %d\n", fd); 269 info = malloc(sizeof(*info)); 270 bzero(info, sizeof(*info)); 271 info->fd = fd; 272 info->detachme = 1; 273 info->usrmsg_callback = hammer2_usrmsg_handler; 274 info->label = strdup("client"); 275 pthread_create(&thread, NULL, dmsg_master_service, info); 276 } 277 return (NULL); 278 } 279 280 /* 281 * Handle/Monitor the dmsg stream. If unmanaged is set we are responsible 282 * for responding for the message, otherwise if it is not set libdmsg has 283 * already done some preprocessing and will respond to the message for us 284 * when we return. 285 * 286 * We primarily monitor for VOLCONFs 287 */ 288 static 289 void 290 hammer2_usrmsg_handler(dmsg_msg_t *msg, int unmanaged) 291 { 292 dmsg_state_t *state; 293 hammer2_media_config_t *conf; 294 dmsg_lnk_hammer2_volconf_t *msgconf; 295 int i; 296 297 /* 298 * Only process messages which are part of a LNK_CONN stream 299 */ 300 state = msg->state; 301 if (state == NULL || 302 (state->rxcmd & DMSGF_BASECMDMASK) != DMSG_LNK_CONN) { 303 hammer2_shell_parse(msg, unmanaged); 304 return; 305 } 306 307 switch(msg->tcmd) { 308 case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE: 309 case DMSG_LNK_CONN | DMSGF_DELETE: 310 case DMSG_LNK_ERROR | DMSGF_DELETE: 311 /* 312 * Deleting connection, clean out all volume configs 313 */ 314 if (state->media == NULL || state->media->usrhandle == NULL) 315 break; 316 conf = state->media->usrhandle; 317 fprintf(stderr, "Shutting down media spans\n"); 318 for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) { 319 if (conf[i].thread) { 320 conf[i].ctl = H2CONFCTL_STOP; 321 pthread_cond_signal(&conf[i].cond); 322 } 323 } 324 for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) { 325 if (conf[i].thread) { 326 pthread_join(conf[i].thread, NULL); 327 conf->thread_started = 0; 328 pthread_cond_destroy(&conf[i].cond); 329 } 330 } 331 state->media->usrhandle = NULL; 332 free(conf); 333 break; 334 case DMSG_LNK_HAMMER2_VOLCONF: 335 /* 336 * One-way volume-configuration message is transmitted 337 * over the open LNK_CONN transaction. 338 */ 339 fprintf(stderr, "RECEIVED VOLCONF\n"); 340 341 if ((conf = state->media->usrhandle) == NULL) { 342 conf = malloc(sizeof(*conf) * HAMMER2_COPYID_COUNT); 343 bzero(conf, sizeof(*conf) * HAMMER2_COPYID_COUNT); 344 state->media->usrhandle = conf; 345 } 346 msgconf = H2_LNK_VOLCONF(msg); 347 348 if (msgconf->index < 0 || 349 msgconf->index >= HAMMER2_COPYID_COUNT) { 350 fprintf(stderr, 351 "VOLCONF: ILLEGAL INDEX %d\n", 352 msgconf->index); 353 break; 354 } 355 if (msgconf->copy.path[sizeof(msgconf->copy.path) - 1] != 0 || 356 msgconf->copy.path[0] == 0) { 357 fprintf(stderr, 358 "VOLCONF: ILLEGAL PATH %d\n", 359 msgconf->index); 360 break; 361 } 362 conf += msgconf->index; 363 pthread_mutex_lock(&confmtx); 364 conf->copy_pend = msgconf->copy; 365 conf->ctl |= H2CONFCTL_UPDATE; 366 pthread_mutex_unlock(&confmtx); 367 if (conf->thread_started == 0) { 368 fprintf(stderr, "VOLCONF THREAD STARTED\n"); 369 pthread_cond_init(&conf->cond, NULL); 370 pthread_create(&conf->thread, NULL, 371 hammer2_volconf_thread, (void *)conf); 372 conf->thread_started = 1; 373 } 374 pthread_cond_signal(&conf->cond); 375 break; 376 default: 377 if (unmanaged) 378 dmsg_msg_reply(msg, DMSG_ERR_NOSUPP); 379 break; 380 } 381 } 382 383 static void * 384 hammer2_volconf_thread(void *info) 385 { 386 hammer2_media_config_t *conf = info; 387 388 setproctitle("hammer2 volconf"); 389 390 pthread_mutex_lock(&confmtx); 391 while ((conf->ctl & H2CONFCTL_STOP) == 0) { 392 if (conf->ctl & H2CONFCTL_UPDATE) { 393 fprintf(stderr, "VOLCONF UPDATE\n"); 394 conf->ctl &= ~H2CONFCTL_UPDATE; 395 if (bcmp(&conf->copy_run, &conf->copy_pend, 396 sizeof(conf->copy_run)) == 0) { 397 fprintf(stderr, "VOLCONF: no changes\n"); 398 continue; 399 } 400 /* 401 * XXX TODO - auto reconnect on lookup failure or 402 * connect failure or stream failure. 403 */ 404 405 pthread_mutex_unlock(&confmtx); 406 hammer2_volconf_stop(conf); 407 conf->copy_run = conf->copy_pend; 408 if (conf->copy_run.copyid != 0 && 409 strncmp((char*)conf->copy_run.path, "span:", 5) == 0) { 410 hammer2_volconf_start(conf, 411 (char*)conf->copy_run.path + 5); 412 } 413 pthread_mutex_lock(&confmtx); 414 fprintf(stderr, "VOLCONF UPDATE DONE state %d\n", conf->state); 415 } 416 if (conf->state == H2MC_CONNECT) { 417 hammer2_volconf_start(conf, (char*)conf->copy_run.path + 5); 418 pthread_mutex_unlock(&confmtx); 419 sleep(5); 420 pthread_mutex_lock(&confmtx); 421 } else { 422 pthread_cond_wait(&conf->cond, &confmtx); 423 } 424 } 425 pthread_mutex_unlock(&confmtx); 426 hammer2_volconf_stop(conf); 427 return(NULL); 428 } 429 430 static 431 void 432 hammer2_volconf_start(hammer2_media_config_t *conf, const char *hostname) 433 { 434 dmsg_master_service_info_t *info; 435 436 switch(conf->state) { 437 case H2MC_STOPPED: 438 case H2MC_CONNECT: 439 conf->fd = dmsg_connect(hostname); 440 if (conf->fd < 0) { 441 fprintf(stderr, "Unable to connect to %s\n", hostname); 442 conf->state = H2MC_CONNECT; 443 } else if (pipe(conf->pipefd) < 0) { 444 close(conf->fd); 445 fprintf(stderr, "pipe() failed during volconf\n"); 446 conf->state = H2MC_CONNECT; 447 } else { 448 fprintf(stderr, "VOLCONF CONNECT\n"); 449 info = malloc(sizeof(*info)); 450 bzero(info, sizeof(*info)); 451 info->fd = conf->fd; 452 info->altfd = conf->pipefd[0]; 453 info->altmsg_callback = hammer2_volconf_signal; 454 info->usrmsg_callback = hammer2_usrmsg_handler; 455 info->detachme = 0; 456 conf->state = H2MC_RUNNING; 457 pthread_create(&conf->iocom_thread, NULL, 458 dmsg_master_service, info); 459 } 460 break; 461 case H2MC_RUNNING: 462 break; 463 } 464 } 465 466 static 467 void 468 hammer2_volconf_stop(hammer2_media_config_t *conf) 469 { 470 switch(conf->state) { 471 case H2MC_STOPPED: 472 break; 473 case H2MC_CONNECT: 474 conf->state = H2MC_STOPPED; 475 break; 476 case H2MC_RUNNING: 477 close(conf->pipefd[1]); 478 conf->pipefd[1] = -1; 479 pthread_join(conf->iocom_thread, NULL); 480 conf->state = H2MC_STOPPED; 481 break; 482 } 483 } 484 485 static 486 void 487 hammer2_volconf_signal(dmsg_iocom_t *iocom) 488 { 489 atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF); 490 } 491 492 /* 493 * Monitor block devices. Currently polls every ~10 seconds or so. 494 */ 495 static 496 void * 497 udev_thread(void *data __unused) 498 { 499 int fd; 500 int seq = 0; 501 502 pthread_detach(pthread_self()); 503 setproctitle("hammer2 udev_thread"); 504 505 if ((fd = open(UDEV_DEVICE_PATH, O_RDWR)) < 0) { 506 fprintf(stderr, "udev_thread: unable to open \"%s\"\n", 507 UDEV_DEVICE_PATH); 508 pthread_exit(NULL); 509 } 510 udev_check_disks(); 511 while (ioctl(fd, UDEVWAIT, &seq) == 0) { 512 udev_check_disks(); 513 sleep(1); 514 } 515 return (NULL); 516 } 517 518 static void *autoconn_connect_thread(void *data); 519 static void autoconn_disconnect_signal(dmsg_iocom_t *iocom); 520 521 static 522 void * 523 autoconn_thread(void *data __unused) 524 { 525 TAILQ_HEAD(, autoconn) autolist; 526 struct autoconn *ac; 527 struct autoconn *next; 528 pthread_t thread; 529 struct stat st; 530 time_t t; 531 time_t lmod; 532 int found_last; 533 FILE *fp; 534 char buf[256]; 535 536 TAILQ_INIT(&autolist); 537 found_last = 0; 538 lmod = 0; 539 540 pthread_detach(pthread_self()); 541 setproctitle("hammer2 autoconn_thread"); 542 for (;;) { 543 /* 544 * Polling interval 545 */ 546 sleep(5); 547 548 /* 549 * Poll the file. Loop up if the synchronized state (lmod) 550 * has not changed. 551 */ 552 if (stat(HAMMER2_DEFAULT_DIR "/autoconn", &st) == 0) { 553 if (lmod == st.st_mtime) 554 continue; 555 fp = fopen(HAMMER2_DEFAULT_DIR "/autoconn", "r"); 556 if (fp == NULL) 557 continue; 558 } else { 559 if (lmod == 0) 560 continue; 561 fp = NULL; 562 } 563 564 /* 565 * Wait at least 5 seconds after the file is created or 566 * removed. 567 * 568 * Do not update the synchronized state. 569 */ 570 if (fp == NULL && found_last) { 571 found_last = 0; 572 continue; 573 } else if (fp && found_last == 0) { 574 fclose(fp); 575 found_last = 1; 576 continue; 577 } 578 579 /* 580 * Don't scan the file until the time progresses past the 581 * file's mtime, so we can validate that the file was not 582 * further modified during our scan. 583 * 584 * Do not update the synchronized state. 585 */ 586 time(&t); 587 if (fp) { 588 if (t == st.st_mtime) { 589 fclose(fp); 590 continue; 591 } 592 t = st.st_mtime; 593 } else { 594 t = 0; 595 } 596 597 /* 598 * Set staging to disconnect, then scan the file. 599 */ 600 TAILQ_FOREACH(ac, &autolist, entry) 601 ac->stage = 0; 602 while (fp && fgets(buf, sizeof(buf), fp) != NULL) { 603 char *host; 604 605 if ((host = strtok(buf, " \t\r\n")) == NULL || 606 host[0] == '#') { 607 continue; 608 } 609 TAILQ_FOREACH(ac, &autolist, entry) { 610 if (strcmp(host, ac->host) == 0) 611 break; 612 } 613 if (ac == NULL) { 614 ac = malloc(sizeof(*ac)); 615 bzero(ac, sizeof(*ac)); 616 ac->host = strdup(host); 617 ac->state = AUTOCONN_INACTIVE; 618 TAILQ_INSERT_TAIL(&autolist, ac, entry); 619 } 620 ac->stage = 1; 621 } 622 623 /* 624 * Ignore the scan (and retry again) if the file was 625 * modified during the scan. 626 * 627 * Do not update the synchronized state. 628 */ 629 if (fp) { 630 if (fstat(fileno(fp), &st) < 0) { 631 fclose(fp); 632 continue; 633 } 634 fclose(fp); 635 if (t != st.st_mtime) 636 continue; 637 } 638 639 /* 640 * Update the synchronized state and reconfigure the 641 * connect list as needed. 642 */ 643 lmod = t; 644 next = TAILQ_FIRST(&autolist); 645 while ((ac = next) != NULL) { 646 next = TAILQ_NEXT(ac, entry); 647 648 /* 649 * Staging, initiate 650 */ 651 if (ac->stage && ac->state == AUTOCONN_INACTIVE) { 652 if (pipe(ac->pipefd) == 0) { 653 ac->stopme = 0; 654 ac->state = AUTOCONN_ACTIVE; 655 pthread_create(&thread, NULL, 656 autoconn_connect_thread, 657 ac); 658 } 659 } 660 661 /* 662 * Unstaging, stop active connection. 663 * 664 * We write to the pipe which causes the iocom_core 665 * to call autoconn_disconnect_signal(). 666 */ 667 if (ac->stage == 0 && 668 ac->state == AUTOCONN_ACTIVE) { 669 if (ac->stopme == 0) { 670 char dummy = 0; 671 ac->stopme = 1; 672 write(ac->pipefd[1], &dummy, 1); 673 } 674 } 675 676 /* 677 * Unstaging, delete inactive connection. 678 */ 679 if (ac->stage == 0 && 680 ac->state == AUTOCONN_INACTIVE) { 681 TAILQ_REMOVE(&autolist, ac, entry); 682 free(ac->host); 683 free(ac); 684 continue; 685 } 686 } 687 sleep(5); 688 } 689 return(NULL); 690 } 691 692 static 693 void * 694 autoconn_connect_thread(void *data) 695 { 696 dmsg_master_service_info_t *info; 697 struct autoconn *ac; 698 void *res; 699 int fd; 700 701 ac = data; 702 pthread_detach(pthread_self()); 703 setproctitle("hammer2 dmsg"); 704 705 while (ac->stopme == 0) { 706 fd = dmsg_connect(ac->host); 707 if (fd < 0) { 708 if (DMsgDebugOpt > 2) { 709 fprintf(stderr, 710 "autoconn: Connect failure: %s\n", 711 ac->host); 712 } 713 sleep(5); 714 continue; 715 } 716 fprintf(stderr, "autoconn: Connect %s\n", ac->host); 717 718 info = malloc(sizeof(*info)); 719 bzero(info, sizeof(*info)); 720 info->fd = fd; 721 info->altfd = ac->pipefd[0]; 722 info->altmsg_callback = autoconn_disconnect_signal; 723 info->usrmsg_callback = hammer2_usrmsg_handler; 724 info->detachme = 0; 725 info->noclosealt = 1; 726 pthread_create(&ac->thread, NULL, dmsg_master_service, info); 727 pthread_join(ac->thread, &res); 728 } 729 close(ac->pipefd[0]); 730 ac->state = AUTOCONN_INACTIVE; 731 /* auto structure can be ripped out here */ 732 return(NULL); 733 } 734 735 static 736 void 737 autoconn_disconnect_signal(dmsg_iocom_t *iocom) 738 { 739 fprintf(stderr, "autoconn: Shutting down socket\n"); 740 atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF); 741 } 742 743 /* 744 * Retrieve the list of disk attachments and attempt to export 745 * them. 746 */ 747 static 748 void 749 udev_check_disks(void) 750 { 751 char tmpbuf[1024]; 752 char *buf = NULL; 753 char *disk; 754 int error; 755 size_t n; 756 757 for (;;) { 758 n = 0; 759 error = sysctlbyname("kern.disks", NULL, &n, NULL, 0); 760 if (error < 0 || n == 0) 761 break; 762 if (n >= sizeof(tmpbuf)) 763 buf = malloc(n + 1); 764 else 765 buf = tmpbuf; 766 error = sysctlbyname("kern.disks", buf, &n, NULL, 0); 767 if (error == 0) { 768 buf[n] = 0; 769 break; 770 } 771 if (buf != tmpbuf) { 772 free(buf); 773 buf = NULL; 774 } 775 if (errno != ENOMEM) 776 break; 777 } 778 if (buf) { 779 fprintf(stderr, "DISKS: %s\n", buf); 780 for (disk = strtok(buf, WS); disk; disk = strtok(NULL, WS)) { 781 disk_reconnect(disk); 782 } 783 if (buf != tmpbuf) 784 free(buf); 785 } 786 } 787 788 /* 789 * Normally the mount program supplies a cluster communications 790 * descriptor to the hammer2 vfs on mount, but if you kill the service 791 * daemon and restart it that link will be lost. 792 * 793 * This procedure attempts to [re]connect to existing mounts when 794 * the service daemon is started up before going into its accept 795 * loop. 796 * 797 * NOTE: A hammer2 mount point can only accomodate one connection at a time 798 * so this will disconnect any existing connection during the 799 * reconnect. 800 */ 801 static 802 void 803 master_reconnect(const char *mntpt) 804 { 805 struct hammer2_ioc_recluster recls; 806 dmsg_master_service_info_t *info; 807 pthread_t thread; 808 int fd; 809 int pipefds[2]; 810 811 fd = open(mntpt, O_RDONLY); 812 if (fd < 0) { 813 fprintf(stderr, "reconnect %s: no access to mount\n", mntpt); 814 return; 815 } 816 if (pipe(pipefds) < 0) { 817 fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt); 818 close(fd); 819 return; 820 } 821 bzero(&recls, sizeof(recls)); 822 recls.fd = pipefds[0]; 823 if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) { 824 fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt); 825 close(pipefds[0]); 826 close(pipefds[1]); 827 close(fd); 828 return; 829 } 830 close(pipefds[0]); 831 close(fd); 832 833 info = malloc(sizeof(*info)); 834 bzero(info, sizeof(*info)); 835 info->fd = pipefds[1]; 836 info->detachme = 1; 837 info->usrmsg_callback = hammer2_usrmsg_handler; 838 info->label = strdup("hammer2"); 839 pthread_create(&thread, NULL, dmsg_master_service, info); 840 } 841 842 /* 843 * Reconnect a physical disk service to the mesh. 844 */ 845 static 846 void 847 disk_reconnect(const char *disk) 848 { 849 struct disk_ioc_recluster recls; 850 struct diskcon *dc; 851 dmsg_master_service_info_t *info; 852 pthread_t thread; 853 int fd; 854 int pipefds[2]; 855 char *path; 856 857 /* 858 * Urm, this will auto-create mdX+1, just ignore for now. 859 * This mechanic needs to be fixed. It might actually be nice 860 * to be able to export md disks. 861 */ 862 if (strncmp(disk, "md", 2) == 0) 863 return; 864 if (strncmp(disk, "xa", 2) == 0) 865 return; 866 867 /* 868 * Check if already connected 869 */ 870 pthread_mutex_lock(&diskmtx); 871 TAILQ_FOREACH(dc, &diskconq, entry) { 872 if (strcmp(dc->disk, disk) == 0) 873 break; 874 } 875 pthread_mutex_unlock(&diskmtx); 876 if (dc) 877 return; 878 879 /* 880 * Not already connected, create a connection to the kernel 881 * disk driver. 882 */ 883 asprintf(&path, "/dev/%s", disk); 884 fd = open(path, O_RDONLY); 885 if (fd < 0) { 886 fprintf(stderr, "reconnect %s: no access to disk\n", disk); 887 free(path); 888 return; 889 } 890 free(path); 891 if (pipe(pipefds) < 0) { 892 fprintf(stderr, "reconnect %s: pipe() failed\n", disk); 893 close(fd); 894 return; 895 } 896 bzero(&recls, sizeof(recls)); 897 recls.fd = pipefds[0]; 898 if (ioctl(fd, DIOCRECLUSTER, &recls) < 0) { 899 fprintf(stderr, "reconnect %s: ioctl failed\n", disk); 900 close(pipefds[0]); 901 close(pipefds[1]); 902 close(fd); 903 return; 904 } 905 close(pipefds[0]); 906 close(fd); 907 908 dc = malloc(sizeof(*dc)); 909 dc->disk = strdup(disk); 910 pthread_mutex_lock(&diskmtx); 911 TAILQ_INSERT_TAIL(&diskconq, dc, entry); 912 pthread_mutex_unlock(&diskmtx); 913 914 info = malloc(sizeof(*info)); 915 bzero(info, sizeof(*info)); 916 info->fd = pipefds[1]; 917 info->detachme = 1; 918 info->usrmsg_callback = hammer2_usrmsg_handler; 919 info->exit_callback = disk_disconnect; 920 info->handle = dc; 921 info->label = strdup(dc->disk); 922 pthread_create(&thread, NULL, dmsg_master_service, info); 923 } 924 925 static 926 void 927 disk_disconnect(void *handle) 928 { 929 struct diskcon *dc = handle; 930 931 fprintf(stderr, "DISK_DISCONNECT %s\n", dc->disk); 932 933 pthread_mutex_lock(&diskmtx); 934 TAILQ_REMOVE(&diskconq, dc, entry); 935 pthread_mutex_unlock(&diskmtx); 936 free(dc->disk); 937 free(dc); 938 } 939 940 /* 941 * Connect our cluster controller to /dev/xdisk. xdisk will pick up 942 * SPAN messages that we route to it, makes remote block devices 943 * available to the host, and can issue dmsg transactions based on 944 * device requests. 945 */ 946 static 947 void 948 xdisk_connect(void) 949 { 950 dmsg_master_service_info_t *info; 951 struct xdisk_attach_ioctl xaioc; 952 pthread_t thread; 953 int pipefds[2]; 954 int xfd; 955 int error; 956 957 /* 958 * Is /dev/xdisk available? 959 */ 960 xfd = open("/dev/xdisk", O_RDWR, 0600); 961 if (xfd < 0) { 962 fprintf(stderr, "xdisk_connect: Unable to open /dev/xdisk\n"); 963 return; 964 } 965 966 if (pipe(pipefds) < 0) { 967 fprintf(stderr, "xdisk_connect: pipe() failed\n"); 968 return; 969 } 970 971 /* 972 * Pipe between cluster controller (this user process). 973 */ 974 info = malloc(sizeof(*info)); 975 bzero(info, sizeof(*info)); 976 info->fd = pipefds[1]; 977 info->detachme = 1; 978 info->usrmsg_callback = hammer2_usrmsg_handler; 979 info->exit_callback = NULL; 980 pthread_create(&thread, NULL, dmsg_master_service, info); 981 982 /* 983 * And the xdisk device. 984 */ 985 bzero(&xaioc, sizeof(xaioc)); 986 xaioc.fd = pipefds[0]; 987 error = ioctl(xfd, XDISKIOCATTACH, &xaioc); 988 close(pipefds[0]); 989 close(xfd); 990 991 if (error < 0) { 992 fprintf(stderr, 993 "xdisk_connect: cannot attach %s\n", 994 strerror(errno)); 995 return; 996 } 997 } 998 999 /* 1000 * Execute the specified function as a detached independent process/daemon, 1001 * unless we are in debug mode. If we are in debug mode the function is 1002 * executed as a pthread in the current process. 1003 */ 1004 void 1005 hammer2_demon(void *(*func)(void *), void *arg) 1006 { 1007 pthread_t thread; 1008 pid_t pid; 1009 int ttyfd; 1010 1011 /* 1012 * Do not disconnect in debug mode 1013 */ 1014 if (DebugOpt) { 1015 pthread_create(&thread, NULL, func, arg); 1016 NormalExit = 0; 1017 return; 1018 } 1019 1020 /* 1021 * Otherwise disconnect us. Double-fork to get rid of the ppid 1022 * association and disconnect the TTY. 1023 */ 1024 if ((pid = fork()) < 0) { 1025 fprintf(stderr, "hammer2: fork(): %s\n", strerror(errno)); 1026 exit(1); 1027 } 1028 if (pid > 0) { 1029 while (waitpid(pid, NULL, 0) != pid) 1030 ; 1031 return; /* parent returns */ 1032 } 1033 1034 /* 1035 * Get rid of the TTY/session before double-forking to finish off 1036 * the ppid. 1037 */ 1038 ttyfd = open("/dev/null", O_RDWR); 1039 if (ttyfd >= 0) { 1040 if (ttyfd != 0) 1041 dup2(ttyfd, 0); 1042 if (ttyfd != 1) 1043 dup2(ttyfd, 1); 1044 if (ttyfd != 2) 1045 dup2(ttyfd, 2); 1046 if (ttyfd > 2) 1047 close(ttyfd); 1048 } 1049 1050 ttyfd = open("/dev/tty", O_RDWR); 1051 if (ttyfd >= 0) { 1052 ioctl(ttyfd, TIOCNOTTY, 0); 1053 close(ttyfd); 1054 } 1055 setsid(); 1056 1057 /* 1058 * Second fork to disconnect ppid (the original parent waits for 1059 * us to exit). 1060 */ 1061 if ((pid = fork()) < 0) { 1062 _exit(2); 1063 } 1064 if (pid > 0) 1065 _exit(0); 1066 1067 /* 1068 * The double child 1069 */ 1070 setsid(); 1071 pthread_create(&thread, NULL, func, arg); 1072 pthread_exit(NULL); 1073 _exit(2); /* NOT REACHED */ 1074 } 1075