1 /*
2 * Copyright (c) 2011-2012 The DragonFly Project. All rights reserved.
3 *
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@dragonflybsd.org>
6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 *
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in
16 * the documentation and/or other materials provided with the
17 * distribution.
18 * 3. Neither the name of The DragonFly Project nor the names of its
19 * contributors may be used to endorse or promote products derived
20 * from this software without specific, prior written permission.
21 *
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33 * SUCH DAMAGE.
34 */
35
36 #include "hammer2.h"
37
38 #include <sys/xdiskioctl.h>
39 #include <machine/atomic.h>
40
41 struct hammer2_media_config {
42 hammer2_volconf_t copy_run;
43 hammer2_volconf_t copy_pend;
44 pthread_t thread;
45 int thread_started;
46 pthread_cond_t cond;
47 int ctl;
48 int fd;
49 int pipefd[2]; /* signal stop */
50 dmsg_iocom_t iocom;
51 pthread_t iocom_thread;
52 enum { H2MC_STOPPED, H2MC_CONNECT, H2MC_RUNNING } state;
53 };
54
55 typedef struct hammer2_media_config hammer2_media_config_t;
56
57 #define H2CONFCTL_STOP 0x00000001
58 #define H2CONFCTL_UPDATE 0x00000002
59
60 struct diskcon {
61 TAILQ_ENTRY(diskcon) entry;
62 char *disk;
63 };
64
65 struct service_node_opaque {
66 char cl_label[64];
67 char fs_label[64];
68 dmsg_media_block_t block;
69 int attached;
70 int servicing;
71 int servicefd;
72 };
73
74 struct autoconn {
75 TAILQ_ENTRY(autoconn) entry;
76 char *host;
77 int stage;
78 int stopme;
79 int pipefd[2]; /* {read,write} */
80 enum { AUTOCONN_INACTIVE, AUTOCONN_ACTIVE } state;
81 pthread_t thread;
82 };
83
84 #define WS " \r\n"
85
86 static TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq);
87 static pthread_mutex_t diskmtx;
88 static pthread_mutex_t confmtx;
89
90 static void *service_thread(void *data);
91 static void *udev_thread(void *data);
92 static void *autoconn_thread(void *data);
93 static void master_reconnect(const char *mntpt);
94 static void disk_reconnect(const char *disk);
95 static void disk_disconnect(void *handle);
96 static void udev_check_disks(void);
97 static void hammer2_usrmsg_handler(dmsg_msg_t *msg, int unmanaged);
98 static void *hammer2_volconf_thread(void *info);
99 static void hammer2_volconf_signal(dmsg_iocom_t *iocom);
100 static void hammer2_volconf_start(hammer2_media_config_t *conf,
101 const char *hostname);
102 static void hammer2_volconf_stop(hammer2_media_config_t *conf);
103
104
105 static void xdisk_connect(void);
106
107 /*
108 * Start-up the master listener daemon for the machine. This daemon runs
109 * a UDP discovery protocol, a TCP rendezvous, and scans certain files
110 * and directories for work.
111 *
112 * --
113 *
114 * The only purpose for the UDP discovery protocol is to determine what
115 * other IPs on the LAN are running the hammer2 service daemon. DNS is not
116 * required to operate, but hostnames (if assigned) must be unique. If
117 * no hostname is assigned the host's IP is used as the name. This name
118 * is broadcast along with the mtime of the originator's private key.
119 *
120 * Receiving hammer2 service daemons which are able to match the label against
121 * /etc/hammer2/remote/<label>.pub will initiate a persistent connection
122 * to the target. Removal of the file will cause a disconnection. A failed
123 * public key negotiation stops further connection attempts until either the
124 * file is updated or the remote mtime is updated.
125 *
126 * Generally speaking this results in a web of connections, typically a
127 * combination of point-to-point for the more important links and relayed
128 * (spanning tree) for less important or filtered links.
129 *
130 * --
131 *
132 * The TCP listener serves as a rendezvous point in the cluster, accepting
133 * connections, performing registrations and authentications, maintaining
134 * the spanning tree, and keeping track of message state so disconnects can
135 * be handled properly.
136 *
137 * Once authenticated only low-level messaging protocols (which includes
138 * tracking persistent messages) are handled by this daemon. This daemon
139 * does not run the higher level quorum or locking protocols.
140 *
141 * --
142 *
143 * The file /etc/hammer2/autoconn, if it exists, contains a list of targets
144 * to connect to (which do not have to be on the local lan). This list will
145 * be retried until a connection can be established. The file is not usually
146 * needed for linkages local to the LAN.
147 */
148 int
cmd_service(void)149 cmd_service(void)
150 {
151 struct sockaddr_in lsin;
152 int on;
153 int lfd;
154
155 /*
156 * Acquire socket and set options
157 */
158 if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
159 fprintf(stderr, "master_listen: socket(): %s\n",
160 strerror(errno));
161 return 1;
162 }
163 on = 1;
164 setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
165
166 /*
167 * Setup listen port and try to bind. If the bind fails we assume
168 * that a master listener process is already running and silently
169 * fail.
170 */
171 bzero(&lsin, sizeof(lsin));
172 lsin.sin_family = AF_INET;
173 lsin.sin_addr.s_addr = INADDR_ANY;
174 lsin.sin_port = htons(DMSG_LISTEN_PORT);
175 if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) {
176 close(lfd);
177 if (QuietOpt == 0) {
178 fprintf(stderr,
179 "master listen: daemon already running\n");
180 }
181 return 0;
182 }
183 if (QuietOpt == 0)
184 fprintf(stderr, "master listen: startup\n");
185 listen(lfd, 50);
186
187 /*
188 * Fork and disconnect the controlling terminal and parent process,
189 * executing the specified function as a pthread.
190 *
191 * Returns to the original process which can then continue running.
192 * In debug mode this call will create the pthread without forking
193 * and set NormalExit to 0, instead of fork.
194 */
195 hammer2_demon(service_thread, (void *)(intptr_t)lfd);
196 if (NormalExit)
197 close(lfd);
198 return 0;
199 }
200
201 /*
202 * Master listen/accept thread. Accept connections on the master socket,
203 * starting a pthread for each one.
204 */
205 static
206 void *
service_thread(void * data)207 service_thread(void *data)
208 {
209 struct sockaddr_in asin;
210 socklen_t alen;
211 pthread_t thread;
212 dmsg_master_service_info_t *info;
213 int lfd = (int)(intptr_t)data;
214 int fd;
215 int i;
216 int count;
217 int opt;
218 struct statfs *mntbuf = NULL;
219 struct statvfs *mntvbuf = NULL;
220
221 /*
222 * Nobody waits for us
223 */
224 setproctitle("hammer2 master listen");
225 pthread_detach(pthread_self());
226
227 /*
228 * Start up a thread to handle block device monitoring for
229 * export to the cluster.
230 */
231 pthread_create(&thread, NULL, udev_thread, NULL);
232
233 /*
234 * Start up a thread to tie /dev/xdisk into the cluster
235 * controller.
236 */
237 xdisk_connect();
238
239 /*
240 * Start thread to manage /etc/hammer2/autoconn
241 */
242 pthread_create(&thread, NULL, autoconn_thread, NULL);
243
244 /*
245 * Scan existing hammer2 mounts and reconnect to them using
246 * HAMMER2IOC_RECLUSTER.
247 */
248 count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT);
249 for (i = 0; i < count; ++i) {
250 if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0)
251 master_reconnect(mntbuf[i].f_mntonname);
252 }
253
254 /*
255 * Accept connections and create pthreads to handle them after
256 * validating the IP.
257 */
258 for (;;) {
259 alen = sizeof(asin);
260 fd = accept(lfd, (struct sockaddr *)&asin, &alen);
261 if (fd < 0) {
262 if (errno == EINTR)
263 continue;
264 break;
265 }
266 opt = 1;
267 setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof opt);
268 fprintf(stderr, "service_thread: accept fd %d\n", fd);
269 info = malloc(sizeof(*info));
270 bzero(info, sizeof(*info));
271 info->fd = fd;
272 info->detachme = 1;
273 info->usrmsg_callback = hammer2_usrmsg_handler;
274 info->label = strdup("client");
275 pthread_create(&thread, NULL, dmsg_master_service, info);
276 }
277 return (NULL);
278 }
279
280 /*
281 * Handle/Monitor the dmsg stream. If unmanaged is set we are responsible
282 * for responding for the message, otherwise if it is not set libdmsg has
283 * already done some preprocessing and will respond to the message for us
284 * when we return.
285 *
286 * We primarily monitor for VOLCONFs
287 */
288 static
289 void
hammer2_usrmsg_handler(dmsg_msg_t * msg,int unmanaged)290 hammer2_usrmsg_handler(dmsg_msg_t *msg, int unmanaged)
291 {
292 dmsg_state_t *state;
293 hammer2_media_config_t *conf;
294 dmsg_lnk_hammer2_volconf_t *msgconf;
295 int i;
296
297 /*
298 * Only process messages which are part of a LNK_CONN stream
299 */
300 state = msg->state;
301 if (state == NULL ||
302 (state->rxcmd & DMSGF_BASECMDMASK) != DMSG_LNK_CONN) {
303 hammer2_shell_parse(msg, unmanaged);
304 return;
305 }
306
307 switch(msg->tcmd) {
308 case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE:
309 case DMSG_LNK_CONN | DMSGF_DELETE:
310 case DMSG_LNK_ERROR | DMSGF_DELETE:
311 /*
312 * Deleting connection, clean out all volume configs
313 */
314 if (state->media == NULL || state->media->usrhandle == NULL)
315 break;
316 conf = state->media->usrhandle;
317 fprintf(stderr, "Shutting down media spans\n");
318 for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) {
319 if (conf[i].thread) {
320 conf[i].ctl = H2CONFCTL_STOP;
321 pthread_cond_signal(&conf[i].cond);
322 }
323 }
324 for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) {
325 if (conf[i].thread) {
326 pthread_join(conf[i].thread, NULL);
327 conf->thread_started = 0;
328 pthread_cond_destroy(&conf[i].cond);
329 }
330 }
331 state->media->usrhandle = NULL;
332 free(conf);
333 break;
334 case DMSG_LNK_HAMMER2_VOLCONF:
335 /*
336 * One-way volume-configuration message is transmitted
337 * over the open LNK_CONN transaction.
338 */
339 fprintf(stderr, "RECEIVED VOLCONF\n");
340
341 if ((conf = state->media->usrhandle) == NULL) {
342 conf = malloc(sizeof(*conf) * HAMMER2_COPYID_COUNT);
343 bzero(conf, sizeof(*conf) * HAMMER2_COPYID_COUNT);
344 state->media->usrhandle = conf;
345 }
346 msgconf = H2_LNK_VOLCONF(msg);
347
348 if (msgconf->index < 0 ||
349 msgconf->index >= HAMMER2_COPYID_COUNT) {
350 fprintf(stderr,
351 "VOLCONF: ILLEGAL INDEX %d\n",
352 msgconf->index);
353 break;
354 }
355 if (msgconf->copy.path[sizeof(msgconf->copy.path) - 1] != 0 ||
356 msgconf->copy.path[0] == 0) {
357 fprintf(stderr,
358 "VOLCONF: ILLEGAL PATH %d\n",
359 msgconf->index);
360 break;
361 }
362 conf += msgconf->index;
363 pthread_mutex_lock(&confmtx);
364 conf->copy_pend = msgconf->copy;
365 conf->ctl |= H2CONFCTL_UPDATE;
366 pthread_mutex_unlock(&confmtx);
367 if (conf->thread_started == 0) {
368 fprintf(stderr, "VOLCONF THREAD STARTED\n");
369 pthread_cond_init(&conf->cond, NULL);
370 pthread_create(&conf->thread, NULL,
371 hammer2_volconf_thread, (void *)conf);
372 conf->thread_started = 1;
373 }
374 pthread_cond_signal(&conf->cond);
375 break;
376 default:
377 if (unmanaged)
378 dmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
379 break;
380 }
381 }
382
383 static void *
hammer2_volconf_thread(void * info)384 hammer2_volconf_thread(void *info)
385 {
386 hammer2_media_config_t *conf = info;
387
388 setproctitle("hammer2 volconf");
389
390 pthread_mutex_lock(&confmtx);
391 while ((conf->ctl & H2CONFCTL_STOP) == 0) {
392 if (conf->ctl & H2CONFCTL_UPDATE) {
393 fprintf(stderr, "VOLCONF UPDATE\n");
394 conf->ctl &= ~H2CONFCTL_UPDATE;
395 if (bcmp(&conf->copy_run, &conf->copy_pend,
396 sizeof(conf->copy_run)) == 0) {
397 fprintf(stderr, "VOLCONF: no changes\n");
398 continue;
399 }
400 /*
401 * XXX TODO - auto reconnect on lookup failure or
402 * connect failure or stream failure.
403 */
404
405 pthread_mutex_unlock(&confmtx);
406 hammer2_volconf_stop(conf);
407 conf->copy_run = conf->copy_pend;
408 if (conf->copy_run.copyid != 0 &&
409 strncmp((char*)conf->copy_run.path, "span:", 5) == 0) {
410 hammer2_volconf_start(conf,
411 (char*)conf->copy_run.path + 5);
412 }
413 pthread_mutex_lock(&confmtx);
414 fprintf(stderr, "VOLCONF UPDATE DONE state %d\n", conf->state);
415 }
416 if (conf->state == H2MC_CONNECT) {
417 hammer2_volconf_start(conf, (char*)conf->copy_run.path + 5);
418 pthread_mutex_unlock(&confmtx);
419 sleep(5);
420 pthread_mutex_lock(&confmtx);
421 } else {
422 pthread_cond_wait(&conf->cond, &confmtx);
423 }
424 }
425 pthread_mutex_unlock(&confmtx);
426 hammer2_volconf_stop(conf);
427 return(NULL);
428 }
429
430 static
431 void
hammer2_volconf_start(hammer2_media_config_t * conf,const char * hostname)432 hammer2_volconf_start(hammer2_media_config_t *conf, const char *hostname)
433 {
434 dmsg_master_service_info_t *info;
435
436 switch(conf->state) {
437 case H2MC_STOPPED:
438 case H2MC_CONNECT:
439 conf->fd = dmsg_connect(hostname);
440 if (conf->fd < 0) {
441 fprintf(stderr, "Unable to connect to %s\n", hostname);
442 conf->state = H2MC_CONNECT;
443 } else if (pipe(conf->pipefd) < 0) {
444 close(conf->fd);
445 fprintf(stderr, "pipe() failed during volconf\n");
446 conf->state = H2MC_CONNECT;
447 } else {
448 fprintf(stderr, "VOLCONF CONNECT\n");
449 info = malloc(sizeof(*info));
450 bzero(info, sizeof(*info));
451 info->fd = conf->fd;
452 info->altfd = conf->pipefd[0];
453 info->altmsg_callback = hammer2_volconf_signal;
454 info->usrmsg_callback = hammer2_usrmsg_handler;
455 info->detachme = 0;
456 conf->state = H2MC_RUNNING;
457 pthread_create(&conf->iocom_thread, NULL,
458 dmsg_master_service, info);
459 }
460 break;
461 case H2MC_RUNNING:
462 break;
463 }
464 }
465
466 static
467 void
hammer2_volconf_stop(hammer2_media_config_t * conf)468 hammer2_volconf_stop(hammer2_media_config_t *conf)
469 {
470 switch(conf->state) {
471 case H2MC_STOPPED:
472 break;
473 case H2MC_CONNECT:
474 conf->state = H2MC_STOPPED;
475 break;
476 case H2MC_RUNNING:
477 close(conf->pipefd[1]);
478 conf->pipefd[1] = -1;
479 pthread_join(conf->iocom_thread, NULL);
480 conf->state = H2MC_STOPPED;
481 break;
482 }
483 }
484
485 static
486 void
hammer2_volconf_signal(dmsg_iocom_t * iocom)487 hammer2_volconf_signal(dmsg_iocom_t *iocom)
488 {
489 atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
490 }
491
492 /*
493 * Monitor block devices. Currently polls every ~10 seconds or so.
494 */
495 static
496 void *
udev_thread(void * data __unused)497 udev_thread(void *data __unused)
498 {
499 int fd;
500 int seq = 0;
501
502 pthread_detach(pthread_self());
503 setproctitle("hammer2 udev_thread");
504
505 if ((fd = open(UDEV_DEVICE_PATH, O_RDWR)) < 0) {
506 fprintf(stderr, "udev_thread: unable to open \"%s\"\n",
507 UDEV_DEVICE_PATH);
508 pthread_exit(NULL);
509 }
510 udev_check_disks();
511 while (ioctl(fd, UDEVWAIT, &seq) == 0) {
512 udev_check_disks();
513 sleep(1);
514 }
515 return (NULL);
516 }
517
518 static void *autoconn_connect_thread(void *data);
519 static void autoconn_disconnect_signal(dmsg_iocom_t *iocom);
520
521 static
522 void *
autoconn_thread(void * data __unused)523 autoconn_thread(void *data __unused)
524 {
525 TAILQ_HEAD(, autoconn) autolist;
526 struct autoconn *ac;
527 struct autoconn *next;
528 pthread_t thread;
529 struct stat st;
530 time_t t;
531 time_t lmod;
532 int found_last;
533 FILE *fp;
534 char buf[256];
535
536 TAILQ_INIT(&autolist);
537 found_last = 0;
538 lmod = 0;
539
540 pthread_detach(pthread_self());
541 setproctitle("hammer2 autoconn_thread");
542 for (;;) {
543 /*
544 * Polling interval
545 */
546 sleep(5);
547
548 /*
549 * Poll the file. Loop up if the synchronized state (lmod)
550 * has not changed.
551 */
552 if (stat(HAMMER2_DEFAULT_DIR "/autoconn", &st) == 0) {
553 if (lmod == st.st_mtime)
554 continue;
555 fp = fopen(HAMMER2_DEFAULT_DIR "/autoconn", "r");
556 if (fp == NULL)
557 continue;
558 } else {
559 if (lmod == 0)
560 continue;
561 fp = NULL;
562 }
563
564 /*
565 * Wait at least 5 seconds after the file is created or
566 * removed.
567 *
568 * Do not update the synchronized state.
569 */
570 if (fp == NULL && found_last) {
571 found_last = 0;
572 continue;
573 } else if (fp && found_last == 0) {
574 fclose(fp);
575 found_last = 1;
576 continue;
577 }
578
579 /*
580 * Don't scan the file until the time progresses past the
581 * file's mtime, so we can validate that the file was not
582 * further modified during our scan.
583 *
584 * Do not update the synchronized state.
585 */
586 time(&t);
587 if (fp) {
588 if (t == st.st_mtime) {
589 fclose(fp);
590 continue;
591 }
592 t = st.st_mtime;
593 } else {
594 t = 0;
595 }
596
597 /*
598 * Set staging to disconnect, then scan the file.
599 */
600 TAILQ_FOREACH(ac, &autolist, entry)
601 ac->stage = 0;
602 while (fp && fgets(buf, sizeof(buf), fp) != NULL) {
603 char *host;
604
605 if ((host = strtok(buf, " \t\r\n")) == NULL ||
606 host[0] == '#') {
607 continue;
608 }
609 TAILQ_FOREACH(ac, &autolist, entry) {
610 if (strcmp(host, ac->host) == 0)
611 break;
612 }
613 if (ac == NULL) {
614 ac = malloc(sizeof(*ac));
615 bzero(ac, sizeof(*ac));
616 ac->host = strdup(host);
617 ac->state = AUTOCONN_INACTIVE;
618 TAILQ_INSERT_TAIL(&autolist, ac, entry);
619 }
620 ac->stage = 1;
621 }
622
623 /*
624 * Ignore the scan (and retry again) if the file was
625 * modified during the scan.
626 *
627 * Do not update the synchronized state.
628 */
629 if (fp) {
630 if (fstat(fileno(fp), &st) < 0) {
631 fclose(fp);
632 continue;
633 }
634 fclose(fp);
635 if (t != st.st_mtime)
636 continue;
637 }
638
639 /*
640 * Update the synchronized state and reconfigure the
641 * connect list as needed.
642 */
643 lmod = t;
644 next = TAILQ_FIRST(&autolist);
645 while ((ac = next) != NULL) {
646 next = TAILQ_NEXT(ac, entry);
647
648 /*
649 * Staging, initiate
650 */
651 if (ac->stage && ac->state == AUTOCONN_INACTIVE) {
652 if (pipe(ac->pipefd) == 0) {
653 ac->stopme = 0;
654 ac->state = AUTOCONN_ACTIVE;
655 pthread_create(&thread, NULL,
656 autoconn_connect_thread,
657 ac);
658 }
659 }
660
661 /*
662 * Unstaging, stop active connection.
663 *
664 * We write to the pipe which causes the iocom_core
665 * to call autoconn_disconnect_signal().
666 */
667 if (ac->stage == 0 &&
668 ac->state == AUTOCONN_ACTIVE) {
669 if (ac->stopme == 0) {
670 char dummy = 0;
671 ac->stopme = 1;
672 write(ac->pipefd[1], &dummy, 1);
673 }
674 }
675
676 /*
677 * Unstaging, delete inactive connection.
678 */
679 if (ac->stage == 0 &&
680 ac->state == AUTOCONN_INACTIVE) {
681 TAILQ_REMOVE(&autolist, ac, entry);
682 free(ac->host);
683 free(ac);
684 continue;
685 }
686 }
687 sleep(5);
688 }
689 return(NULL);
690 }
691
692 static
693 void *
autoconn_connect_thread(void * data)694 autoconn_connect_thread(void *data)
695 {
696 dmsg_master_service_info_t *info;
697 struct autoconn *ac;
698 void *res;
699 int fd;
700
701 ac = data;
702 pthread_detach(pthread_self());
703 setproctitle("hammer2 dmsg");
704
705 while (ac->stopme == 0) {
706 fd = dmsg_connect(ac->host);
707 if (fd < 0) {
708 if (DMsgDebugOpt > 2) {
709 fprintf(stderr,
710 "autoconn: Connect failure: %s\n",
711 ac->host);
712 }
713 sleep(5);
714 continue;
715 }
716 fprintf(stderr, "autoconn: Connect %s\n", ac->host);
717
718 info = malloc(sizeof(*info));
719 bzero(info, sizeof(*info));
720 info->fd = fd;
721 info->altfd = ac->pipefd[0];
722 info->altmsg_callback = autoconn_disconnect_signal;
723 info->usrmsg_callback = hammer2_usrmsg_handler;
724 info->detachme = 0;
725 info->noclosealt = 1;
726 pthread_create(&ac->thread, NULL, dmsg_master_service, info);
727 pthread_join(ac->thread, &res);
728 }
729 close(ac->pipefd[0]);
730 ac->state = AUTOCONN_INACTIVE;
731 /* auto structure can be ripped out here */
732 return(NULL);
733 }
734
735 static
736 void
autoconn_disconnect_signal(dmsg_iocom_t * iocom)737 autoconn_disconnect_signal(dmsg_iocom_t *iocom)
738 {
739 fprintf(stderr, "autoconn: Shutting down socket\n");
740 atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
741 }
742
743 /*
744 * Retrieve the list of disk attachments and attempt to export
745 * them.
746 */
747 static
748 void
udev_check_disks(void)749 udev_check_disks(void)
750 {
751 char tmpbuf[1024];
752 char *buf = NULL;
753 char *disk;
754 int error;
755 size_t n;
756
757 for (;;) {
758 n = 0;
759 error = sysctlbyname("kern.disks", NULL, &n, NULL, 0);
760 if (error < 0 || n == 0)
761 break;
762 if (n >= sizeof(tmpbuf))
763 buf = malloc(n + 1);
764 else
765 buf = tmpbuf;
766 error = sysctlbyname("kern.disks", buf, &n, NULL, 0);
767 if (error == 0) {
768 buf[n] = 0;
769 break;
770 }
771 if (buf != tmpbuf) {
772 free(buf);
773 buf = NULL;
774 }
775 if (errno != ENOMEM)
776 break;
777 }
778 if (buf) {
779 fprintf(stderr, "DISKS: %s\n", buf);
780 for (disk = strtok(buf, WS); disk; disk = strtok(NULL, WS)) {
781 disk_reconnect(disk);
782 }
783 if (buf != tmpbuf)
784 free(buf);
785 }
786 }
787
788 /*
789 * Normally the mount program supplies a cluster communications
790 * descriptor to the hammer2 vfs on mount, but if you kill the service
791 * daemon and restart it that link will be lost.
792 *
793 * This procedure attempts to [re]connect to existing mounts when
794 * the service daemon is started up before going into its accept
795 * loop.
796 *
797 * NOTE: A hammer2 mount point can only accomodate one connection at a time
798 * so this will disconnect any existing connection during the
799 * reconnect.
800 */
801 static
802 void
master_reconnect(const char * mntpt)803 master_reconnect(const char *mntpt)
804 {
805 struct hammer2_ioc_recluster recls;
806 dmsg_master_service_info_t *info;
807 pthread_t thread;
808 int fd;
809 int pipefds[2];
810
811 fd = open(mntpt, O_RDONLY);
812 if (fd < 0) {
813 fprintf(stderr, "reconnect %s: no access to mount\n", mntpt);
814 return;
815 }
816 if (pipe(pipefds) < 0) {
817 fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt);
818 close(fd);
819 return;
820 }
821 bzero(&recls, sizeof(recls));
822 recls.fd = pipefds[0];
823 if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) {
824 fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt);
825 close(pipefds[0]);
826 close(pipefds[1]);
827 close(fd);
828 return;
829 }
830 close(pipefds[0]);
831 close(fd);
832
833 info = malloc(sizeof(*info));
834 bzero(info, sizeof(*info));
835 info->fd = pipefds[1];
836 info->detachme = 1;
837 info->usrmsg_callback = hammer2_usrmsg_handler;
838 info->label = strdup("hammer2");
839 pthread_create(&thread, NULL, dmsg_master_service, info);
840 }
841
842 /*
843 * Reconnect a physical disk service to the mesh.
844 */
845 static
846 void
disk_reconnect(const char * disk)847 disk_reconnect(const char *disk)
848 {
849 struct disk_ioc_recluster recls;
850 struct diskcon *dc;
851 dmsg_master_service_info_t *info;
852 pthread_t thread;
853 int fd;
854 int pipefds[2];
855 char *path;
856
857 /*
858 * Urm, this will auto-create mdX+1, just ignore for now.
859 * This mechanic needs to be fixed. It might actually be nice
860 * to be able to export md disks.
861 */
862 if (strncmp(disk, "md", 2) == 0)
863 return;
864 if (strncmp(disk, "xa", 2) == 0)
865 return;
866
867 /*
868 * Check if already connected
869 */
870 pthread_mutex_lock(&diskmtx);
871 TAILQ_FOREACH(dc, &diskconq, entry) {
872 if (strcmp(dc->disk, disk) == 0)
873 break;
874 }
875 pthread_mutex_unlock(&diskmtx);
876 if (dc)
877 return;
878
879 /*
880 * Not already connected, create a connection to the kernel
881 * disk driver.
882 */
883 asprintf(&path, "/dev/%s", disk);
884 fd = open(path, O_RDONLY);
885 if (fd < 0) {
886 fprintf(stderr, "reconnect %s: no access to disk\n", disk);
887 free(path);
888 return;
889 }
890 free(path);
891 if (pipe(pipefds) < 0) {
892 fprintf(stderr, "reconnect %s: pipe() failed\n", disk);
893 close(fd);
894 return;
895 }
896 bzero(&recls, sizeof(recls));
897 recls.fd = pipefds[0];
898 if (ioctl(fd, DIOCRECLUSTER, &recls) < 0) {
899 fprintf(stderr, "reconnect %s: ioctl failed\n", disk);
900 close(pipefds[0]);
901 close(pipefds[1]);
902 close(fd);
903 return;
904 }
905 close(pipefds[0]);
906 close(fd);
907
908 dc = malloc(sizeof(*dc));
909 dc->disk = strdup(disk);
910 pthread_mutex_lock(&diskmtx);
911 TAILQ_INSERT_TAIL(&diskconq, dc, entry);
912 pthread_mutex_unlock(&diskmtx);
913
914 info = malloc(sizeof(*info));
915 bzero(info, sizeof(*info));
916 info->fd = pipefds[1];
917 info->detachme = 1;
918 info->usrmsg_callback = hammer2_usrmsg_handler;
919 info->exit_callback = disk_disconnect;
920 info->handle = dc;
921 info->label = strdup(dc->disk);
922 pthread_create(&thread, NULL, dmsg_master_service, info);
923 }
924
925 static
926 void
disk_disconnect(void * handle)927 disk_disconnect(void *handle)
928 {
929 struct diskcon *dc = handle;
930
931 fprintf(stderr, "DISK_DISCONNECT %s\n", dc->disk);
932
933 pthread_mutex_lock(&diskmtx);
934 TAILQ_REMOVE(&diskconq, dc, entry);
935 pthread_mutex_unlock(&diskmtx);
936 free(dc->disk);
937 free(dc);
938 }
939
940 /*
941 * Connect our cluster controller to /dev/xdisk. xdisk will pick up
942 * SPAN messages that we route to it, makes remote block devices
943 * available to the host, and can issue dmsg transactions based on
944 * device requests.
945 */
946 static
947 void
xdisk_connect(void)948 xdisk_connect(void)
949 {
950 dmsg_master_service_info_t *info;
951 struct xdisk_attach_ioctl xaioc;
952 pthread_t thread;
953 int pipefds[2];
954 int xfd;
955 int error;
956
957 /*
958 * Is /dev/xdisk available?
959 */
960 xfd = open("/dev/xdisk", O_RDWR, 0600);
961 if (xfd < 0) {
962 fprintf(stderr, "xdisk_connect: Unable to open /dev/xdisk\n");
963 return;
964 }
965
966 if (pipe(pipefds) < 0) {
967 fprintf(stderr, "xdisk_connect: pipe() failed\n");
968 return;
969 }
970
971 /*
972 * Pipe between cluster controller (this user process).
973 */
974 info = malloc(sizeof(*info));
975 bzero(info, sizeof(*info));
976 info->fd = pipefds[1];
977 info->detachme = 1;
978 info->usrmsg_callback = hammer2_usrmsg_handler;
979 info->exit_callback = NULL;
980 pthread_create(&thread, NULL, dmsg_master_service, info);
981
982 /*
983 * And the xdisk device.
984 */
985 bzero(&xaioc, sizeof(xaioc));
986 xaioc.fd = pipefds[0];
987 error = ioctl(xfd, XDISKIOCATTACH, &xaioc);
988 close(pipefds[0]);
989 close(xfd);
990
991 if (error < 0) {
992 fprintf(stderr,
993 "xdisk_connect: cannot attach %s\n",
994 strerror(errno));
995 return;
996 }
997 }
998
999 /*
1000 * Execute the specified function as a detached independent process/daemon,
1001 * unless we are in debug mode. If we are in debug mode the function is
1002 * executed as a pthread in the current process.
1003 */
1004 void
hammer2_demon(void * (* func)(void *),void * arg)1005 hammer2_demon(void *(*func)(void *), void *arg)
1006 {
1007 pthread_t thread;
1008 pid_t pid;
1009 int ttyfd;
1010
1011 /*
1012 * Do not disconnect in debug mode
1013 */
1014 if (DebugOpt) {
1015 pthread_create(&thread, NULL, func, arg);
1016 NormalExit = 0;
1017 return;
1018 }
1019
1020 /*
1021 * Otherwise disconnect us. Double-fork to get rid of the ppid
1022 * association and disconnect the TTY.
1023 */
1024 if ((pid = fork()) < 0) {
1025 fprintf(stderr, "hammer2: fork(): %s\n", strerror(errno));
1026 exit(1);
1027 }
1028 if (pid > 0) {
1029 while (waitpid(pid, NULL, 0) != pid)
1030 ;
1031 return; /* parent returns */
1032 }
1033
1034 /*
1035 * Get rid of the TTY/session before double-forking to finish off
1036 * the ppid.
1037 */
1038 ttyfd = open("/dev/null", O_RDWR);
1039 if (ttyfd >= 0) {
1040 if (ttyfd != 0)
1041 dup2(ttyfd, 0);
1042 if (ttyfd != 1)
1043 dup2(ttyfd, 1);
1044 if (ttyfd != 2)
1045 dup2(ttyfd, 2);
1046 if (ttyfd > 2)
1047 close(ttyfd);
1048 }
1049
1050 ttyfd = open("/dev/tty", O_RDWR);
1051 if (ttyfd >= 0) {
1052 ioctl(ttyfd, TIOCNOTTY, 0);
1053 close(ttyfd);
1054 }
1055 setsid();
1056
1057 /*
1058 * Second fork to disconnect ppid (the original parent waits for
1059 * us to exit).
1060 */
1061 if ((pid = fork()) < 0) {
1062 _exit(2);
1063 }
1064 if (pid > 0)
1065 _exit(0);
1066
1067 /*
1068 * The double child
1069 */
1070 setsid();
1071 pthread_create(&thread, NULL, func, arg);
1072 pthread_exit(NULL);
1073 _exit(2); /* NOT REACHED */
1074 }
1075