1 /* 2 * Copyright (c) 2012 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@dragonflybsd.org> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 */ 34 /* 35 * LNK_SPAN PROTOCOL SUPPORT FUNCTIONS - Please see sys/dmsg.h for an 36 * involved explanation of the protocol. 37 */ 38 39 #include "dmsg_local.h" 40 41 void (*dmsg_node_handler)(void **opaquep, struct dmsg_msg *msg, int op); 42 43 /* 44 * Maximum spanning tree distance. This has the practical effect of 45 * stopping tail-chasing closed loops when a feeder span is lost. 46 */ 47 #define DMSG_SPAN_MAXDIST 16 48 49 /* 50 * RED-BLACK TREE DEFINITIONS 51 * 52 * We need to track: 53 * 54 * (1) shared fsid's (a cluster). 55 * (2) unique fsid's (a node in a cluster) <--- LNK_SPAN transactions. 56 * 57 * We need to aggegate all active LNK_SPANs, aggregate, and create our own 58 * outgoing LNK_SPAN transactions on each of our connections representing 59 * the aggregated state. 60 * 61 * h2span_conn - list of iocom connections who wish to receive SPAN 62 * propagation from other connections. Might contain 63 * a filter string. Only iocom's with an open 64 * LNK_CONN transactions are applicable for SPAN 65 * propagation. 66 * 67 * h2span_relay - List of links relayed (via SPAN). Essentially 68 * each relay structure represents a LNK_SPAN 69 * transaction that we initiated, verses h2span_link 70 * which is a LNK_SPAN transaction that we received. 71 * 72 * -- 73 * 74 * h2span_cluster - Organizes the shared fsid's. One structure for 75 * each cluster. 76 * 77 * h2span_node - Organizes the nodes in a cluster. One structure 78 * for each unique {cluster,node}, aka {fsid, pfs_fsid}. 79 * 80 * h2span_link - Organizes all incoming and outgoing LNK_SPAN message 81 * transactions related to a node. 82 * 83 * One h2span_link structure for each incoming LNK_SPAN 84 * transaction. Links selected for propagation back 85 * out are also where the outgoing LNK_SPAN messages 86 * are indexed into (so we can propagate changes). 87 * 88 * The h2span_link's use a red-black tree to sort the 89 * distance hop metric for the incoming LNK_SPAN. We 90 * then select the top N for outgoing. When the 91 * topology changes the top N may also change and cause 92 * new outgoing LNK_SPAN transactions to be opened 93 * and less desireable ones to be closed, causing 94 * transactional aborts within the message flow in 95 * the process. 96 * 97 * Also note - All outgoing LNK_SPAN message transactions are also 98 * entered into a red-black tree for use by the routing 99 * function. This is handled by msg.c in the state 100 * code, not here. 101 */ 102 103 struct h2span_link; 104 struct h2span_relay; 105 TAILQ_HEAD(h2span_media_queue, h2span_media); 106 TAILQ_HEAD(h2span_conn_queue, h2span_conn); 107 TAILQ_HEAD(h2span_relay_queue, h2span_relay); 108 109 RB_HEAD(h2span_cluster_tree, h2span_cluster); 110 RB_HEAD(h2span_node_tree, h2span_node); 111 RB_HEAD(h2span_link_tree, h2span_link); 112 RB_HEAD(h2span_relay_tree, h2span_relay); 113 uint32_t DMsgRNSS; 114 115 /* 116 * This represents a media 117 */ 118 struct h2span_media { 119 TAILQ_ENTRY(h2span_media) entry; 120 uuid_t mediaid; 121 int refs; 122 struct h2span_media_config { 123 dmsg_vol_data_t copy_run; 124 dmsg_vol_data_t copy_pend; 125 pthread_t thread; 126 pthread_cond_t cond; 127 int ctl; 128 int fd; 129 int pipefd[2]; /* signal stop */ 130 dmsg_iocom_t iocom; 131 pthread_t iocom_thread; 132 enum { H2MC_STOPPED, H2MC_CONNECT, H2MC_RUNNING } state; 133 } config[DMSG_COPYID_COUNT]; 134 }; 135 136 typedef struct h2span_media_config h2span_media_config_t; 137 138 #define H2CONFCTL_STOP 0x00000001 139 #define H2CONFCTL_UPDATE 0x00000002 140 141 /* 142 * Received LNK_CONN transaction enables SPAN protocol over connection. 143 * (may contain filter). Typically one for each mount and several may 144 * share the same media. 145 */ 146 struct h2span_conn { 147 TAILQ_ENTRY(h2span_conn) entry; 148 struct h2span_relay_tree tree; 149 struct h2span_media *media; 150 dmsg_state_t *state; 151 }; 152 153 /* 154 * All received LNK_SPANs are organized by cluster (pfs_clid), 155 * node (pfs_fsid), and link (received LNK_SPAN transaction). 156 */ 157 struct h2span_cluster { 158 RB_ENTRY(h2span_cluster) rbnode; 159 struct h2span_node_tree tree; 160 uuid_t pfs_clid; /* shared fsid */ 161 uint8_t peer_type; 162 char cl_label[128]; /* cluster label (typ PEER_BLOCK) */ 163 int refs; /* prevents destruction */ 164 }; 165 166 struct h2span_node { 167 RB_ENTRY(h2span_node) rbnode; 168 struct h2span_link_tree tree; 169 struct h2span_cluster *cls; 170 uint8_t pfs_type; 171 uuid_t pfs_fsid; /* unique fsid */ 172 char fs_label[128]; /* fs label (typ PEER_HAMMER2) */ 173 void *opaque; 174 }; 175 176 struct h2span_link { 177 RB_ENTRY(h2span_link) rbnode; 178 dmsg_state_t *state; /* state<->link */ 179 struct h2span_node *node; /* related node */ 180 uint32_t dist; 181 uint32_t rnss; 182 struct h2span_relay_queue relayq; /* relay out */ 183 }; 184 185 /* 186 * Any LNK_SPAN transactions we receive which are relayed out other 187 * connections utilize this structure to track the LNK_SPAN transactions 188 * we initiate (relay out) on other connections. We only relay out 189 * LNK_SPANs on connections we have an open CONN transaction for. 190 * 191 * The relay structure points to the outgoing LNK_SPAN trans (out_state) 192 * and to the incoming LNK_SPAN transaction (in_state). The relay 193 * structure holds refs on the related states. 194 * 195 * In many respects this is the core of the protocol... actually figuring 196 * out what LNK_SPANs to relay. The spanid used for relaying is the 197 * address of the 'state' structure, which is why h2span_relay has to 198 * be entered into a RB-TREE based at h2span_conn (so we can look 199 * up the spanid to validate it). 200 */ 201 struct h2span_relay { 202 TAILQ_ENTRY(h2span_relay) entry; /* from link */ 203 RB_ENTRY(h2span_relay) rbnode; /* from h2span_conn */ 204 struct h2span_conn *conn; /* related CONN transaction */ 205 dmsg_state_t *source_rt; /* h2span_link state */ 206 dmsg_state_t *target_rt; /* h2span_relay state */ 207 }; 208 209 typedef struct h2span_media h2span_media_t; 210 typedef struct h2span_conn h2span_conn_t; 211 typedef struct h2span_cluster h2span_cluster_t; 212 typedef struct h2span_node h2span_node_t; 213 typedef struct h2span_link h2span_link_t; 214 typedef struct h2span_relay h2span_relay_t; 215 216 #define dmsg_termstr(array) _dmsg_termstr((array), sizeof(array)) 217 218 static h2span_relay_t *dmsg_generate_relay(h2span_conn_t *conn, 219 h2span_link_t *slink); 220 static uint32_t dmsg_rnss(void); 221 222 static __inline 223 void 224 _dmsg_termstr(char *base, size_t size) 225 { 226 base[size-1] = 0; 227 } 228 229 /* 230 * Cluster peer_type, uuid, AND label must match for a match 231 */ 232 static 233 int 234 h2span_cluster_cmp(h2span_cluster_t *cls1, h2span_cluster_t *cls2) 235 { 236 int r; 237 238 if (cls1->peer_type < cls2->peer_type) 239 return(-1); 240 if (cls1->peer_type > cls2->peer_type) 241 return(1); 242 r = uuid_compare(&cls1->pfs_clid, &cls2->pfs_clid, NULL); 243 if (r == 0) 244 r = strcmp(cls1->cl_label, cls2->cl_label); 245 246 return r; 247 } 248 249 /* 250 * Match against fs_label/pfs_fsid. Together these two items represent a 251 * unique node. In most cases the primary differentiator is pfs_fsid but 252 * we also string-match fs_label. 253 */ 254 static 255 int 256 h2span_node_cmp(h2span_node_t *node1, h2span_node_t *node2) 257 { 258 int r; 259 260 r = strcmp(node1->fs_label, node2->fs_label); 261 if (r == 0) 262 r = uuid_compare(&node1->pfs_fsid, &node2->pfs_fsid, NULL); 263 return (r); 264 } 265 266 /* 267 * Sort/subsort must match h2span_relay_cmp() under any given node 268 * to make the aggregation algorithm easier, so the best links are 269 * in the same sorted order as the best relays. 270 * 271 * NOTE: We cannot use link*->state->msgid because this msgid is created 272 * by each remote host and thus might wind up being the same. 273 */ 274 static 275 int 276 h2span_link_cmp(h2span_link_t *link1, h2span_link_t *link2) 277 { 278 if (link1->dist < link2->dist) 279 return(-1); 280 if (link1->dist > link2->dist) 281 return(1); 282 if (link1->rnss < link2->rnss) 283 return(-1); 284 if (link1->rnss > link2->rnss) 285 return(1); 286 #if 1 287 if ((uintptr_t)link1->state < (uintptr_t)link2->state) 288 return(-1); 289 if ((uintptr_t)link1->state > (uintptr_t)link2->state) 290 return(1); 291 #else 292 if (link1->state->msgid < link2->state->msgid) 293 return(-1); 294 if (link1->state->msgid > link2->state->msgid) 295 return(1); 296 #endif 297 return(0); 298 } 299 300 /* 301 * Relay entries are sorted by node, subsorted by distance and link 302 * address (so we can match up the conn->tree relay topology with 303 * a node's link topology). 304 */ 305 static 306 int 307 h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2) 308 { 309 h2span_link_t *link1 = relay1->source_rt->any.link; 310 h2span_link_t *link2 = relay2->source_rt->any.link; 311 312 if ((intptr_t)link1->node < (intptr_t)link2->node) 313 return(-1); 314 if ((intptr_t)link1->node > (intptr_t)link2->node) 315 return(1); 316 if (link1->dist < link2->dist) 317 return(-1); 318 if (link1->dist > link2->dist) 319 return(1); 320 if (link1->rnss < link2->rnss) 321 return(-1); 322 if (link1->rnss > link2->rnss) 323 return(1); 324 #if 1 325 if ((uintptr_t)link1->state < (uintptr_t)link2->state) 326 return(-1); 327 if ((uintptr_t)link1->state > (uintptr_t)link2->state) 328 return(1); 329 #else 330 if (link1->state->msgid < link2->state->msgid) 331 return(-1); 332 if (link1->state->msgid > link2->state->msgid) 333 return(1); 334 #endif 335 return(0); 336 } 337 338 RB_PROTOTYPE_STATIC(h2span_cluster_tree, h2span_cluster, 339 rbnode, h2span_cluster_cmp); 340 RB_PROTOTYPE_STATIC(h2span_node_tree, h2span_node, 341 rbnode, h2span_node_cmp); 342 RB_PROTOTYPE_STATIC(h2span_link_tree, h2span_link, 343 rbnode, h2span_link_cmp); 344 RB_PROTOTYPE_STATIC(h2span_relay_tree, h2span_relay, 345 rbnode, h2span_relay_cmp); 346 347 RB_GENERATE_STATIC(h2span_cluster_tree, h2span_cluster, 348 rbnode, h2span_cluster_cmp); 349 RB_GENERATE_STATIC(h2span_node_tree, h2span_node, 350 rbnode, h2span_node_cmp); 351 RB_GENERATE_STATIC(h2span_link_tree, h2span_link, 352 rbnode, h2span_link_cmp); 353 RB_GENERATE_STATIC(h2span_relay_tree, h2span_relay, 354 rbnode, h2span_relay_cmp); 355 356 /* 357 * Global mutex protects cluster_tree lookups, connq, mediaq. 358 */ 359 static pthread_mutex_t cluster_mtx; 360 static struct h2span_cluster_tree cluster_tree = RB_INITIALIZER(cluster_tree); 361 static struct h2span_conn_queue connq = TAILQ_HEAD_INITIALIZER(connq); 362 static struct h2span_media_queue mediaq = TAILQ_HEAD_INITIALIZER(mediaq); 363 364 static void dmsg_lnk_span(dmsg_msg_t *msg); 365 static void dmsg_lnk_conn(dmsg_msg_t *msg); 366 static void dmsg_lnk_circ(dmsg_msg_t *msg); 367 static void dmsg_lnk_relay(dmsg_msg_t *msg); 368 static void dmsg_relay_scan(h2span_conn_t *conn, h2span_node_t *node); 369 static void dmsg_relay_delete(h2span_relay_t *relay); 370 371 static void *dmsg_volconf_thread(void *info); 372 static void dmsg_volconf_stop(h2span_media_config_t *conf); 373 static void dmsg_volconf_start(h2span_media_config_t *conf, 374 const char *hostname); 375 376 void 377 dmsg_msg_lnk_signal(dmsg_iocom_t *iocom __unused) 378 { 379 pthread_mutex_lock(&cluster_mtx); 380 dmsg_relay_scan(NULL, NULL); 381 pthread_mutex_unlock(&cluster_mtx); 382 } 383 384 /* 385 * DMSG_PROTO_LNK - Generic DMSG_PROTO_LNK. 386 * (incoming iocom lock not held) 387 * 388 * This function is typically called for one-way and opening-transactions 389 * since state->func is assigned after that, but it will also be called 390 * if no state->func is assigned on transaction-open. 391 */ 392 void 393 dmsg_msg_lnk(dmsg_msg_t *msg) 394 { 395 uint32_t icmd = msg->state ? msg->state->icmd : msg->any.head.cmd; 396 397 switch(icmd & DMSGF_BASECMDMASK) { 398 case DMSG_LNK_CONN: 399 dmsg_lnk_conn(msg); 400 break; 401 case DMSG_LNK_SPAN: 402 dmsg_lnk_span(msg); 403 break; 404 case DMSG_LNK_CIRC: 405 dmsg_lnk_circ(msg); 406 break; 407 default: 408 fprintf(stderr, 409 "MSG_PROTO_LNK: Unknown msg %08x\n", msg->any.head.cmd); 410 dmsg_msg_reply(msg, DMSG_ERR_NOSUPP); 411 /* state invalid after reply */ 412 break; 413 } 414 } 415 416 /* 417 * LNK_CONN - iocom identify message reception. 418 * (incoming iocom lock not held) 419 * 420 * Remote node identifies itself to us, sets up a SPAN filter, and gives us 421 * the ok to start transmitting SPANs. 422 */ 423 void 424 dmsg_lnk_conn(dmsg_msg_t *msg) 425 { 426 dmsg_state_t *state = msg->state; 427 h2span_media_t *media; 428 h2span_media_config_t *conf; 429 h2span_conn_t *conn; 430 h2span_relay_t *relay; 431 char *alloc = NULL; 432 int i; 433 434 pthread_mutex_lock(&cluster_mtx); 435 436 fprintf(stderr, "dmsg_lnk_conn: msg %p cmd %08x state %p txcmd %08x rxcmd %08x\n", 437 msg, msg->any.head.cmd, state, state->txcmd, state->rxcmd); 438 439 switch(msg->any.head.cmd & DMSGF_TRANSMASK) { 440 case DMSG_LNK_CONN | DMSGF_CREATE: 441 case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE: 442 /* 443 * On transaction start we allocate a new h2span_conn and 444 * acknowledge the request, leaving the transaction open. 445 * We then relay priority-selected SPANs. 446 */ 447 fprintf(stderr, "LNK_CONN(%08x): %s/%s/%s\n", 448 (uint32_t)msg->any.head.msgid, 449 dmsg_uuid_to_str(&msg->any.lnk_conn.pfs_clid, 450 &alloc), 451 msg->any.lnk_conn.cl_label, 452 msg->any.lnk_conn.fs_label); 453 free(alloc); 454 455 conn = dmsg_alloc(sizeof(*conn)); 456 457 RB_INIT(&conn->tree); 458 state->iocom->conn = conn; /* XXX only one */ 459 conn->state = state; 460 state->func = dmsg_lnk_conn; 461 state->any.conn = conn; 462 TAILQ_INSERT_TAIL(&connq, conn, entry); 463 464 /* 465 * Set up media 466 */ 467 TAILQ_FOREACH(media, &mediaq, entry) { 468 if (uuid_compare(&msg->any.lnk_conn.mediaid, 469 &media->mediaid, NULL) == 0) { 470 break; 471 } 472 } 473 if (media == NULL) { 474 media = dmsg_alloc(sizeof(*media)); 475 media->mediaid = msg->any.lnk_conn.mediaid; 476 TAILQ_INSERT_TAIL(&mediaq, media, entry); 477 } 478 conn->media = media; 479 ++media->refs; 480 481 if ((msg->any.head.cmd & DMSGF_DELETE) == 0) { 482 dmsg_msg_result(msg, 0); 483 dmsg_iocom_signal(msg->iocom); 484 break; 485 } 486 /* FALL THROUGH */ 487 case DMSG_LNK_CONN | DMSGF_DELETE: 488 case DMSG_LNK_ERROR | DMSGF_DELETE: 489 deleteconn: 490 /* 491 * On transaction terminate we clean out our h2span_conn 492 * and acknowledge the request, closing the transaction. 493 */ 494 fprintf(stderr, "LNK_CONN: Terminated\n"); 495 conn = state->any.conn; 496 assert(conn); 497 498 /* 499 * Clean out the media structure. If refs drops to zero we 500 * also clean out the media config threads. These threads 501 * maintain span connections to other hammer2 service daemons. 502 */ 503 media = conn->media; 504 if (--media->refs == 0) { 505 fprintf(stderr, "Shutting down media spans\n"); 506 for (i = 0; i < DMSG_COPYID_COUNT; ++i) { 507 conf = &media->config[i]; 508 509 if (conf->thread == NULL) 510 continue; 511 conf->ctl = H2CONFCTL_STOP; 512 pthread_cond_signal(&conf->cond); 513 } 514 for (i = 0; i < DMSG_COPYID_COUNT; ++i) { 515 conf = &media->config[i]; 516 517 if (conf->thread == NULL) 518 continue; 519 pthread_mutex_unlock(&cluster_mtx); 520 pthread_join(conf->thread, NULL); 521 pthread_mutex_lock(&cluster_mtx); 522 conf->thread = NULL; 523 pthread_cond_destroy(&conf->cond); 524 } 525 fprintf(stderr, "Media shutdown complete\n"); 526 TAILQ_REMOVE(&mediaq, media, entry); 527 dmsg_free(media); 528 } 529 530 /* 531 * Clean out all relays. This requires terminating each 532 * relay transaction. 533 */ 534 while ((relay = RB_ROOT(&conn->tree)) != NULL) { 535 dmsg_relay_delete(relay); 536 } 537 538 /* 539 * Clean out conn 540 */ 541 conn->media = NULL; 542 conn->state = NULL; 543 msg->state->any.conn = NULL; 544 msg->state->iocom->conn = NULL; 545 TAILQ_REMOVE(&connq, conn, entry); 546 dmsg_free(conn); 547 548 dmsg_msg_reply(msg, 0); 549 /* state invalid after reply */ 550 break; 551 case DMSG_LNK_VOLCONF: 552 /* 553 * One-way volume-configuration message is transmitted 554 * over the open LNK_CONN transaction. 555 */ 556 fprintf(stderr, "RECEIVED VOLCONF\n"); 557 if (msg->any.lnk_volconf.index < 0 || 558 msg->any.lnk_volconf.index >= DMSG_COPYID_COUNT) { 559 fprintf(stderr, "VOLCONF: ILLEGAL INDEX %d\n", 560 msg->any.lnk_volconf.index); 561 break; 562 } 563 if (msg->any.lnk_volconf.copy.path[sizeof(msg->any.lnk_volconf.copy.path) - 1] != 0 || 564 msg->any.lnk_volconf.copy.path[0] == 0) { 565 fprintf(stderr, "VOLCONF: ILLEGAL PATH %d\n", 566 msg->any.lnk_volconf.index); 567 break; 568 } 569 conn = msg->state->any.conn; 570 if (conn == NULL) { 571 fprintf(stderr, "VOLCONF: LNK_CONN is missing\n"); 572 break; 573 } 574 conf = &conn->media->config[msg->any.lnk_volconf.index]; 575 conf->copy_pend = msg->any.lnk_volconf.copy; 576 conf->ctl |= H2CONFCTL_UPDATE; 577 if (conf->thread == NULL) { 578 fprintf(stderr, "VOLCONF THREAD STARTED\n"); 579 pthread_cond_init(&conf->cond, NULL); 580 pthread_create(&conf->thread, NULL, 581 dmsg_volconf_thread, (void *)conf); 582 } 583 pthread_cond_signal(&conf->cond); 584 break; 585 default: 586 /* 587 * Failsafe 588 */ 589 if (msg->any.head.cmd & DMSGF_DELETE) 590 goto deleteconn; 591 dmsg_msg_reply(msg, DMSG_ERR_NOSUPP); 592 break; 593 } 594 pthread_mutex_unlock(&cluster_mtx); 595 } 596 597 /* 598 * LNK_SPAN - Spanning tree protocol message reception 599 * (incoming iocom lock not held) 600 * 601 * Receive a spanning tree transactional message, creating or destroying 602 * a SPAN and propagating it to other iocoms. 603 */ 604 void 605 dmsg_lnk_span(dmsg_msg_t *msg) 606 { 607 dmsg_state_t *state = msg->state; 608 h2span_cluster_t dummy_cls; 609 h2span_node_t dummy_node; 610 h2span_cluster_t *cls; 611 h2span_node_t *node; 612 h2span_link_t *slink; 613 h2span_relay_t *relay; 614 char *alloc = NULL; 615 616 assert((msg->any.head.cmd & DMSGF_REPLY) == 0); 617 618 pthread_mutex_lock(&cluster_mtx); 619 620 /* 621 * On transaction start we initialize the tracking infrastructure 622 */ 623 if (msg->any.head.cmd & DMSGF_CREATE) { 624 assert(state->func == NULL); 625 state->func = dmsg_lnk_span; 626 627 dmsg_termstr(msg->any.lnk_span.cl_label); 628 dmsg_termstr(msg->any.lnk_span.fs_label); 629 630 /* 631 * Find the cluster 632 */ 633 dummy_cls.pfs_clid = msg->any.lnk_span.pfs_clid; 634 dummy_cls.peer_type = msg->any.lnk_span.peer_type; 635 bcopy(msg->any.lnk_span.cl_label, 636 dummy_cls.cl_label, 637 sizeof(dummy_cls.cl_label)); 638 cls = RB_FIND(h2span_cluster_tree, &cluster_tree, &dummy_cls); 639 if (cls == NULL) { 640 cls = dmsg_alloc(sizeof(*cls)); 641 cls->pfs_clid = msg->any.lnk_span.pfs_clid; 642 cls->peer_type = msg->any.lnk_span.peer_type; 643 bcopy(msg->any.lnk_span.cl_label, 644 cls->cl_label, 645 sizeof(cls->cl_label)); 646 RB_INIT(&cls->tree); 647 RB_INSERT(h2span_cluster_tree, &cluster_tree, cls); 648 } 649 650 /* 651 * Find the node 652 */ 653 dummy_node.pfs_fsid = msg->any.lnk_span.pfs_fsid; 654 bcopy(msg->any.lnk_span.fs_label, dummy_node.fs_label, 655 sizeof(dummy_node.fs_label)); 656 node = RB_FIND(h2span_node_tree, &cls->tree, &dummy_node); 657 if (node == NULL) { 658 node = dmsg_alloc(sizeof(*node)); 659 node->pfs_fsid = msg->any.lnk_span.pfs_fsid; 660 node->pfs_type = msg->any.lnk_span.pfs_type; 661 bcopy(msg->any.lnk_span.fs_label, 662 node->fs_label, 663 sizeof(node->fs_label)); 664 node->cls = cls; 665 RB_INIT(&node->tree); 666 RB_INSERT(h2span_node_tree, &cls->tree, node); 667 if (dmsg_node_handler) { 668 dmsg_node_handler(&node->opaque, msg, 669 DMSG_NODEOP_ADD); 670 } 671 } 672 673 /* 674 * Create the link 675 */ 676 assert(state->any.link == NULL); 677 slink = dmsg_alloc(sizeof(*slink)); 678 TAILQ_INIT(&slink->relayq); 679 slink->node = node; 680 slink->dist = msg->any.lnk_span.dist; 681 slink->rnss = msg->any.lnk_span.rnss; 682 slink->state = state; 683 state->any.link = slink; 684 685 RB_INSERT(h2span_link_tree, &node->tree, slink); 686 687 fprintf(stderr, 688 "LNK_SPAN(thr %p): %p %s cl=%s fs=%s dist=%d\n", 689 msg->iocom, 690 slink, 691 dmsg_uuid_to_str(&msg->any.lnk_span.pfs_clid, &alloc), 692 msg->any.lnk_span.cl_label, 693 msg->any.lnk_span.fs_label, 694 msg->any.lnk_span.dist); 695 free(alloc); 696 #if 0 697 dmsg_relay_scan(NULL, node); 698 #endif 699 dmsg_iocom_signal(msg->iocom); 700 } 701 702 /* 703 * On transaction terminate we remove the tracking infrastructure. 704 */ 705 if (msg->any.head.cmd & DMSGF_DELETE) { 706 slink = state->any.link; 707 assert(slink != NULL); 708 node = slink->node; 709 cls = node->cls; 710 711 fprintf(stderr, "LNK_DELE(thr %p): %p %s cl=%s fs=%s dist=%d\n", 712 msg->iocom, 713 slink, 714 dmsg_uuid_to_str(&cls->pfs_clid, &alloc), 715 state->msg->any.lnk_span.cl_label, 716 state->msg->any.lnk_span.fs_label, 717 state->msg->any.lnk_span.dist); 718 free(alloc); 719 720 /* 721 * Clean out all relays. This requires terminating each 722 * relay transaction. 723 */ 724 while ((relay = TAILQ_FIRST(&slink->relayq)) != NULL) { 725 dmsg_relay_delete(relay); 726 } 727 728 /* 729 * Clean out the topology 730 */ 731 RB_REMOVE(h2span_link_tree, &node->tree, slink); 732 if (RB_EMPTY(&node->tree)) { 733 RB_REMOVE(h2span_node_tree, &cls->tree, node); 734 if (dmsg_node_handler) { 735 dmsg_node_handler(&node->opaque, msg, 736 DMSG_NODEOP_DEL); 737 } 738 if (RB_EMPTY(&cls->tree) && cls->refs == 0) { 739 RB_REMOVE(h2span_cluster_tree, 740 &cluster_tree, cls); 741 dmsg_free(cls); 742 } 743 node->cls = NULL; 744 dmsg_free(node); 745 node = NULL; 746 } 747 state->any.link = NULL; 748 slink->state = NULL; 749 slink->node = NULL; 750 dmsg_free(slink); 751 752 /* 753 * We have to terminate the transaction 754 */ 755 dmsg_state_reply(state, 0); 756 /* state invalid after reply */ 757 758 /* 759 * If the node still exists issue any required updates. If 760 * it doesn't then all related relays have already been 761 * removed and there's nothing left to do. 762 */ 763 #if 0 764 if (node) 765 dmsg_relay_scan(NULL, node); 766 #endif 767 if (node) 768 dmsg_iocom_signal(msg->iocom); 769 } 770 771 pthread_mutex_unlock(&cluster_mtx); 772 } 773 774 /* 775 * LNK_CIRC - Virtual circuit protocol message reception 776 * (incoming iocom lock not held) 777 * 778 * Handles all cases. 779 */ 780 void 781 dmsg_lnk_circ(dmsg_msg_t *msg) 782 { 783 dmsg_circuit_t *circA; 784 dmsg_circuit_t *circB; 785 dmsg_state_t *rx_state; 786 dmsg_state_t *tx_state; 787 dmsg_state_t *state; 788 dmsg_state_t dummy; 789 dmsg_msg_t *fwd_msg; 790 dmsg_iocom_t *iocomA; 791 dmsg_iocom_t *iocomB; 792 int disconnect; 793 794 /*pthread_mutex_lock(&cluster_mtx);*/ 795 796 if (DMsgDebugOpt >= 4) 797 fprintf(stderr, "CIRC receive cmd=%08x\n", msg->any.head.cmd); 798 799 switch (msg->any.head.cmd & (DMSGF_CREATE | 800 DMSGF_DELETE | 801 DMSGF_REPLY)) { 802 case DMSGF_CREATE: 803 case DMSGF_CREATE | DMSGF_DELETE: 804 /* 805 * (A) wishes to establish a virtual circuit through us to (B). 806 * (B) is specified by lnk_circ.target (the message id for 807 * a LNK_SPAN that (A) received from us which represents (B)). 808 * 809 * Designate the originator of the circuit (the current 810 * remote end) as (A) and the other side as (B). 811 * 812 * Accept the VC but do not reply. We will wait for the end- 813 * to-end reply to propagate back. 814 */ 815 iocomA = msg->iocom; 816 817 /* 818 * Locate the open transaction state that the other end 819 * specified in <target>. This will be an open SPAN 820 * transaction that we transmitted (h2span_relay) over 821 * the interface the LNK_CIRC is being received on. 822 * 823 * (all LNK_CIRC's that we transmit are on circuit0) 824 */ 825 pthread_mutex_lock(&iocomA->mtx); 826 dummy.msgid = msg->any.lnk_circ.target; 827 tx_state = RB_FIND(dmsg_state_tree, 828 &iocomA->circuit0.statewr_tree, 829 &dummy); 830 pthread_mutex_unlock(&iocomA->mtx); 831 if (tx_state == NULL) { 832 /* XXX SMP race */ 833 fprintf(stderr, "dmsg_lnk_circ: no circuit\n"); 834 dmsg_msg_reply(msg, DMSG_ERR_CANTCIRC); 835 break; 836 } 837 if (tx_state->icmd != DMSG_LNK_SPAN) { 838 /* XXX SMP race */ 839 fprintf(stderr, "dmsg_lnk_circ: not LNK_SPAN\n"); 840 dmsg_msg_reply(msg, DMSG_ERR_CANTCIRC); 841 break; 842 } 843 844 /* locate h2span_link */ 845 rx_state = tx_state->any.relay->source_rt; 846 847 /* 848 * A wishes to establish a VC through us to the 849 * specified target. 850 * 851 * A sends us the msgid of an open SPAN transaction 852 * it received from us as <target>. 853 */ 854 circA = dmsg_alloc(sizeof(*circA)); 855 dmsg_circuit_init(iocomA, circA); 856 circA->state = msg->state; /* LNK_CIRC state */ 857 circA->msgid = msg->state->msgid; 858 circA->span_state = tx_state; /* H2SPAN_RELAY state */ 859 circA->is_relay = 1; 860 circA->refs = 2; /* state and peer */ 861 862 /* 863 * Upgrade received state so we act on both it and its 864 * peer (created below) symmetrically. 865 */ 866 msg->state->any.circ = circA; 867 msg->state->func = dmsg_lnk_circ; 868 869 iocomB = rx_state->iocom; 870 871 circB = dmsg_alloc(sizeof(*circB)); 872 dmsg_circuit_init(iocomB, circB); 873 874 /* 875 * Create a LNK_CIRC transaction on B 876 */ 877 fwd_msg = dmsg_msg_alloc(&iocomB->circuit0, 878 0, DMSG_LNK_CIRC | DMSGF_CREATE, 879 dmsg_lnk_circ, circB); 880 fwd_msg->state->any.circ = circB; 881 fwd_msg->any.lnk_circ.target = rx_state->msgid; 882 circB->state = fwd_msg->state; /* LNK_CIRC state */ 883 circB->msgid = fwd_msg->any.head.msgid; 884 circB->span_state = rx_state; /* H2SPAN_LINK state */ 885 circB->is_relay = 0; 886 circB->refs = 2; /* state and peer */ 887 888 if (DMsgDebugOpt >= 4) 889 fprintf(stderr, "CIRC forward %p->%p\n", circA, circB); 890 891 /* 892 * Link the two circuits together. 893 */ 894 circA->peer = circB; 895 circB->peer = circA; 896 897 if (iocomA < iocomB) { 898 pthread_mutex_lock(&iocomA->mtx); 899 pthread_mutex_lock(&iocomB->mtx); 900 } else { 901 pthread_mutex_lock(&iocomB->mtx); 902 pthread_mutex_lock(&iocomA->mtx); 903 } 904 if (RB_INSERT(dmsg_circuit_tree, &iocomA->circuit_tree, circA)) 905 assert(0); 906 if (RB_INSERT(dmsg_circuit_tree, &iocomB->circuit_tree, circB)) 907 assert(0); 908 if (iocomA < iocomB) { 909 pthread_mutex_unlock(&iocomB->mtx); 910 pthread_mutex_unlock(&iocomA->mtx); 911 } else { 912 pthread_mutex_unlock(&iocomA->mtx); 913 pthread_mutex_unlock(&iocomB->mtx); 914 } 915 916 dmsg_msg_write(fwd_msg); 917 918 if ((msg->any.head.cmd & DMSGF_DELETE) == 0) 919 break; 920 /* FALL THROUGH TO DELETE */ 921 case DMSGF_DELETE: 922 /* 923 * (A) Is deleting the virtual circuit, propogate closure 924 * to (B). 925 */ 926 iocomA = msg->iocom; 927 if (msg->state->any.circ == NULL) { 928 /* already returned an error/deleted */ 929 break; 930 } 931 circA = msg->state->any.circ; 932 circB = circA->peer; 933 assert(msg->state == circA->state); 934 935 /* 936 * We are closing B's send side. If B's receive side is 937 * already closed we disconnect the circuit from B's state. 938 */ 939 disconnect = 0; 940 if (circB && (state = circB->state) != NULL) { 941 if (state->rxcmd & DMSGF_DELETE) { 942 disconnect = 1; 943 circB->state = NULL; 944 state->any.circ = NULL; 945 dmsg_circuit_drop(circB); 946 } 947 dmsg_state_reply(state, msg->any.head.error); 948 } 949 950 /* 951 * We received a close on A. If A's send side is already 952 * closed we disconnect the circuit from A's state. 953 */ 954 if (circA && (state = circA->state) != NULL) { 955 if (state->txcmd & DMSGF_DELETE) { 956 disconnect = 1; 957 circA->state = NULL; 958 state->any.circ = NULL; 959 dmsg_circuit_drop(circA); 960 } 961 } 962 963 /* 964 * Disconnect the peer<->peer association 965 */ 966 if (disconnect) { 967 if (circB) { 968 circA->peer = NULL; 969 circB->peer = NULL; 970 dmsg_circuit_drop(circA); 971 dmsg_circuit_drop(circB); /* XXX SMP */ 972 } 973 } 974 break; 975 case DMSGF_REPLY | DMSGF_CREATE: 976 case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE: 977 /* 978 * (B) is acknowledging the creation of the virtual 979 * circuit. This propagates all the way back to (A), though 980 * it should be noted that (A) can start issuing commands 981 * via the virtual circuit before seeing this reply. 982 */ 983 circB = msg->state->any.circ; 984 assert(circB); 985 circA = circB->peer; 986 assert(msg->state == circB->state); 987 assert(circA); 988 if ((msg->any.head.cmd & DMSGF_DELETE) == 0) { 989 dmsg_state_result(circA->state, msg->any.head.error); 990 break; 991 } 992 /* FALL THROUGH TO DELETE */ 993 case DMSGF_REPLY | DMSGF_DELETE: 994 /* 995 * (B) Is deleting the virtual circuit or acknowledging 996 * our deletion of the virtual circuit, propogate closure 997 * to (A). 998 */ 999 iocomB = msg->iocom; 1000 circB = msg->state->any.circ; 1001 circA = circB->peer; 1002 assert(msg->state == circB->state); 1003 1004 /* 1005 * We received a close on (B), propagate to (A). If we have 1006 * already received the close from (A) we disconnect the state. 1007 */ 1008 disconnect = 0; 1009 if (circA && (state = circA->state) != NULL) { 1010 if (state->rxcmd & DMSGF_DELETE) { 1011 disconnect = 1; 1012 circA->state = NULL; 1013 state->any.circ = NULL; 1014 dmsg_circuit_drop(circA); 1015 } 1016 dmsg_state_reply(state, msg->any.head.error); 1017 } 1018 1019 /* 1020 * We received a close on (B). If (B)'s send side is already 1021 * closed we disconnect the state. 1022 */ 1023 if (circB && (state = circB->state) != NULL) { 1024 if (state->txcmd & DMSGF_DELETE) { 1025 disconnect = 1; 1026 circB->state = NULL; 1027 state->any.circ = NULL; 1028 dmsg_circuit_drop(circB); 1029 } 1030 } 1031 1032 /* 1033 * Disconnect the peer<->peer association 1034 */ 1035 if (disconnect) { 1036 if (circA) { 1037 circB->peer = NULL; 1038 circA->peer = NULL; 1039 dmsg_circuit_drop(circB); 1040 dmsg_circuit_drop(circA); /* XXX SMP */ 1041 } 1042 } 1043 break; 1044 } 1045 1046 /*pthread_mutex_lock(&cluster_mtx);*/ 1047 } 1048 1049 /* 1050 * Update relay transactions for SPANs. 1051 * 1052 * Called with cluster_mtx held. 1053 */ 1054 static void dmsg_relay_scan_specific(h2span_node_t *node, 1055 h2span_conn_t *conn); 1056 1057 static void 1058 dmsg_relay_scan(h2span_conn_t *conn, h2span_node_t *node) 1059 { 1060 h2span_cluster_t *cls; 1061 1062 if (node) { 1063 /* 1064 * Iterate specific node 1065 */ 1066 TAILQ_FOREACH(conn, &connq, entry) 1067 dmsg_relay_scan_specific(node, conn); 1068 } else { 1069 /* 1070 * Full iteration. 1071 * 1072 * Iterate cluster ids, nodes, and either a specific connection 1073 * or all connections. 1074 */ 1075 RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) { 1076 /* 1077 * Iterate node ids 1078 */ 1079 RB_FOREACH(node, h2span_node_tree, &cls->tree) { 1080 /* 1081 * Synchronize the node's link (received SPANs) 1082 * with each connection's relays. 1083 */ 1084 if (conn) { 1085 dmsg_relay_scan_specific(node, conn); 1086 } else { 1087 TAILQ_FOREACH(conn, &connq, entry) { 1088 dmsg_relay_scan_specific(node, 1089 conn); 1090 } 1091 assert(conn == NULL); 1092 } 1093 } 1094 } 1095 } 1096 } 1097 1098 /* 1099 * Update the relay'd SPANs for this (node, conn). 1100 * 1101 * Iterate links and adjust relays to match. We only propagate the top link 1102 * for now (XXX we want to propagate the top two). 1103 * 1104 * The dmsg_relay_scan_cmp() function locates the first relay element 1105 * for any given node. The relay elements will be sub-sorted by dist. 1106 */ 1107 struct relay_scan_info { 1108 h2span_node_t *node; 1109 h2span_relay_t *relay; 1110 }; 1111 1112 static int 1113 dmsg_relay_scan_cmp(h2span_relay_t *relay, void *arg) 1114 { 1115 struct relay_scan_info *info = arg; 1116 1117 if ((intptr_t)relay->source_rt->any.link->node < (intptr_t)info->node) 1118 return(-1); 1119 if ((intptr_t)relay->source_rt->any.link->node > (intptr_t)info->node) 1120 return(1); 1121 return(0); 1122 } 1123 1124 static int 1125 dmsg_relay_scan_callback(h2span_relay_t *relay, void *arg) 1126 { 1127 struct relay_scan_info *info = arg; 1128 1129 info->relay = relay; 1130 return(-1); 1131 } 1132 1133 static void 1134 dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn) 1135 { 1136 struct relay_scan_info info; 1137 h2span_relay_t *relay; 1138 h2span_relay_t *next_relay; 1139 h2span_link_t *slink; 1140 dmsg_lnk_conn_t *lconn; 1141 dmsg_lnk_span_t *lspan; 1142 int count; 1143 int maxcount = 2; 1144 #ifdef REQUIRE_SYMMETRICAL 1145 uint32_t lastdist = DMSG_SPAN_MAXDIST; 1146 uint32_t lastrnss = 0; 1147 #endif 1148 1149 info.node = node; 1150 info.relay = NULL; 1151 1152 /* 1153 * Locate the first related relay for the node on this connection. 1154 * relay will be NULL if there were none. 1155 */ 1156 RB_SCAN(h2span_relay_tree, &conn->tree, 1157 dmsg_relay_scan_cmp, dmsg_relay_scan_callback, &info); 1158 relay = info.relay; 1159 info.relay = NULL; 1160 if (relay) 1161 assert(relay->source_rt->any.link->node == node); 1162 1163 if (DMsgDebugOpt > 8) 1164 fprintf(stderr, "relay scan for connection %p\n", conn); 1165 1166 /* 1167 * Iterate the node's links (received SPANs) in distance order, 1168 * lowest (best) dist first. 1169 * 1170 * PROPAGATE THE BEST LINKS OVER THE SPECIFIED CONNECTION. 1171 * 1172 * Track relays while iterating the best links and construct 1173 * missing relays when necessary. 1174 * 1175 * (If some prior better link was removed it would have also 1176 * removed the relay, so the relay can only match exactly or 1177 * be worse). 1178 */ 1179 count = 0; 1180 RB_FOREACH(slink, h2span_link_tree, &node->tree) { 1181 /* 1182 * Increment count of successful relays. This isn't 1183 * quite accurate if we break out but nothing after 1184 * the loop uses (count). 1185 * 1186 * If count exceeds the maximum number of relays we desire 1187 * we normally want to break out. However, in order to 1188 * guarantee a symmetric path we have to continue if both 1189 * (dist) and (rnss) continue to match. Otherwise the SPAN 1190 * propagation in the reverse direction may choose different 1191 * routes and we will not have a symmetric path. 1192 * 1193 * NOTE: Spanning tree does not have to be symmetrical so 1194 * this code is not currently enabled. 1195 */ 1196 if (++count >= maxcount) { 1197 #ifdef REQUIRE_SYMMETRICAL 1198 if (lastdist != slink->dist || lastrnss != slink->rnss) 1199 break; 1200 #else 1201 break; 1202 #endif 1203 /* go beyond the nominal maximum desired relays */ 1204 } 1205 1206 /* 1207 * Match, relay already in-place, get the next 1208 * relay to match against the next slink. 1209 */ 1210 if (relay && relay->source_rt->any.link == slink) { 1211 relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay); 1212 continue; 1213 } 1214 1215 /* 1216 * We might want this SLINK, if it passes our filters. 1217 * 1218 * The spanning tree can cause closed loops so we have 1219 * to limit slink->dist. 1220 */ 1221 if (slink->dist > DMSG_SPAN_MAXDIST) 1222 break; 1223 1224 /* 1225 * Don't bother transmitting a LNK_SPAN out the same 1226 * connection it came in on. Trivial optimization. 1227 */ 1228 if (slink->state->iocom == conn->state->iocom) 1229 break; 1230 1231 /* 1232 * NOTE ON FILTERS: The protocol spec allows non-requested 1233 * SPANs to be transmitted, the other end is expected to 1234 * leave their transactions open but otherwise ignore them. 1235 * 1236 * Don't bother transmitting if the remote connection 1237 * is not accepting this SPAN's peer_type. 1238 * 1239 * pfs_mask is typically used so pure clients can filter 1240 * out receiving SPANs for other pure clients. 1241 */ 1242 lspan = &slink->state->msg->any.lnk_span; 1243 lconn = &conn->state->msg->any.lnk_conn; 1244 if (((1LLU << lspan->peer_type) & lconn->peer_mask) == 0) 1245 break; 1246 if (((1LLU << lspan->pfs_type) & lconn->pfs_mask) == 0) 1247 break; 1248 1249 /* 1250 * Do not give pure clients visibility to other pure clients 1251 */ 1252 if (lconn->pfs_type == DMSG_PFSTYPE_CLIENT && 1253 lspan->pfs_type == DMSG_PFSTYPE_CLIENT) { 1254 break; 1255 } 1256 1257 /* 1258 * Connection filter, if cluster uuid is not NULL it must 1259 * match the span cluster uuid. Only applies when the 1260 * peer_type matches. 1261 */ 1262 if (lspan->peer_type == lconn->peer_type && 1263 !uuid_is_nil(&lconn->pfs_clid, NULL) && 1264 uuid_compare(&slink->node->cls->pfs_clid, 1265 &lconn->pfs_clid, NULL)) { 1266 break; 1267 } 1268 1269 /* 1270 * Connection filter, if cluster label is not empty it must 1271 * match the span cluster label. Only applies when the 1272 * peer_type matches. 1273 */ 1274 if (lspan->peer_type == lconn->peer_type && 1275 lconn->cl_label[0] && 1276 strcmp(lconn->cl_label, slink->node->cls->cl_label)) { 1277 break; 1278 } 1279 1280 /* 1281 * NOTE! pfs_fsid differentiates nodes within the same cluster 1282 * so we obviously don't want to match those. Similarly 1283 * for fs_label. 1284 */ 1285 1286 /* 1287 * Ok, we've accepted this SPAN for relaying. 1288 */ 1289 assert(relay == NULL || 1290 relay->source_rt->any.link->node != slink->node || 1291 relay->source_rt->any.link->dist >= slink->dist); 1292 relay = dmsg_generate_relay(conn, slink); 1293 #ifdef REQUIRE_SYMMETRICAL 1294 lastdist = slink->dist; 1295 lastrnss = slink->rnss; 1296 #endif 1297 1298 /* 1299 * Match (created new relay), get the next relay to 1300 * match against the next slink. 1301 */ 1302 relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay); 1303 } 1304 1305 /* 1306 * Any remaining relay's belonging to this connection which match 1307 * the node are in excess of the current aggregate spanning state 1308 * and should be removed. 1309 */ 1310 while (relay && relay->source_rt->any.link->node == node) { 1311 next_relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay); 1312 fprintf(stderr, "RELAY DELETE FROM EXTRAS\n"); 1313 dmsg_relay_delete(relay); 1314 relay = next_relay; 1315 } 1316 } 1317 1318 /* 1319 * Helper function to generate missing relay. 1320 * 1321 * cluster_mtx must be held 1322 */ 1323 static 1324 h2span_relay_t * 1325 dmsg_generate_relay(h2span_conn_t *conn, h2span_link_t *slink) 1326 { 1327 h2span_relay_t *relay; 1328 dmsg_msg_t *msg; 1329 1330 relay = dmsg_alloc(sizeof(*relay)); 1331 relay->conn = conn; 1332 relay->source_rt = slink->state; 1333 /* relay->source_rt->any.link = slink; */ 1334 1335 /* 1336 * NOTE: relay->target_rt->any.relay set to relay by alloc. 1337 */ 1338 msg = dmsg_msg_alloc(&conn->state->iocom->circuit0, 1339 0, DMSG_LNK_SPAN | DMSGF_CREATE, 1340 dmsg_lnk_relay, relay); 1341 relay->target_rt = msg->state; 1342 1343 msg->any.lnk_span = slink->state->msg->any.lnk_span; 1344 msg->any.lnk_span.dist = slink->dist + 1; 1345 msg->any.lnk_span.rnss = slink->rnss + dmsg_rnss(); 1346 1347 RB_INSERT(h2span_relay_tree, &conn->tree, relay); 1348 TAILQ_INSERT_TAIL(&slink->relayq, relay, entry); 1349 1350 dmsg_msg_write(msg); 1351 1352 return (relay); 1353 } 1354 1355 /* 1356 * Messages received on relay SPANs. These are open transactions so it is 1357 * in fact possible for the other end to close the transaction. 1358 * 1359 * XXX MPRACE on state structure 1360 */ 1361 static void 1362 dmsg_lnk_relay(dmsg_msg_t *msg) 1363 { 1364 dmsg_state_t *state = msg->state; 1365 h2span_relay_t *relay; 1366 1367 assert(msg->any.head.cmd & DMSGF_REPLY); 1368 1369 if (msg->any.head.cmd & DMSGF_DELETE) { 1370 pthread_mutex_lock(&cluster_mtx); 1371 fprintf(stderr, "RELAY DELETE FROM LNK_RELAY MSG\n"); 1372 if ((relay = state->any.relay) != NULL) { 1373 dmsg_relay_delete(relay); 1374 } else { 1375 dmsg_state_reply(state, 0); 1376 } 1377 pthread_mutex_unlock(&cluster_mtx); 1378 } 1379 } 1380 1381 /* 1382 * cluster_mtx held by caller 1383 */ 1384 static 1385 void 1386 dmsg_relay_delete(h2span_relay_t *relay) 1387 { 1388 fprintf(stderr, 1389 "RELAY DELETE %p RELAY %p ON CLS=%p NODE=%p DIST=%d FD %d STATE %p\n", 1390 relay->source_rt->any.link, 1391 relay, 1392 relay->source_rt->any.link->node->cls, relay->source_rt->any.link->node, 1393 relay->source_rt->any.link->dist, 1394 relay->conn->state->iocom->sock_fd, relay->target_rt); 1395 1396 RB_REMOVE(h2span_relay_tree, &relay->conn->tree, relay); 1397 TAILQ_REMOVE(&relay->source_rt->any.link->relayq, relay, entry); 1398 1399 if (relay->target_rt) { 1400 relay->target_rt->any.relay = NULL; 1401 dmsg_state_reply(relay->target_rt, 0); 1402 /* state invalid after reply */ 1403 relay->target_rt = NULL; 1404 } 1405 relay->conn = NULL; 1406 relay->source_rt = NULL; 1407 dmsg_free(relay); 1408 } 1409 1410 static void * 1411 dmsg_volconf_thread(void *info) 1412 { 1413 h2span_media_config_t *conf = info; 1414 1415 pthread_mutex_lock(&cluster_mtx); 1416 while ((conf->ctl & H2CONFCTL_STOP) == 0) { 1417 if (conf->ctl & H2CONFCTL_UPDATE) { 1418 fprintf(stderr, "VOLCONF UPDATE\n"); 1419 conf->ctl &= ~H2CONFCTL_UPDATE; 1420 if (bcmp(&conf->copy_run, &conf->copy_pend, 1421 sizeof(conf->copy_run)) == 0) { 1422 fprintf(stderr, "VOLCONF: no changes\n"); 1423 continue; 1424 } 1425 /* 1426 * XXX TODO - auto reconnect on lookup failure or 1427 * connect failure or stream failure. 1428 */ 1429 1430 pthread_mutex_unlock(&cluster_mtx); 1431 dmsg_volconf_stop(conf); 1432 conf->copy_run = conf->copy_pend; 1433 if (conf->copy_run.copyid != 0 && 1434 strncmp(conf->copy_run.path, "span:", 5) == 0) { 1435 dmsg_volconf_start(conf, 1436 conf->copy_run.path + 5); 1437 } 1438 pthread_mutex_lock(&cluster_mtx); 1439 fprintf(stderr, "VOLCONF UPDATE DONE state %d\n", conf->state); 1440 } 1441 if (conf->state == H2MC_CONNECT) { 1442 dmsg_volconf_start(conf, conf->copy_run.path + 5); 1443 pthread_mutex_unlock(&cluster_mtx); 1444 sleep(5); 1445 pthread_mutex_lock(&cluster_mtx); 1446 } else { 1447 pthread_cond_wait(&conf->cond, &cluster_mtx); 1448 } 1449 } 1450 pthread_mutex_unlock(&cluster_mtx); 1451 dmsg_volconf_stop(conf); 1452 return(NULL); 1453 } 1454 1455 static void dmsg_volconf_signal(dmsg_iocom_t *iocom); 1456 1457 static 1458 void 1459 dmsg_volconf_start(h2span_media_config_t *conf, const char *hostname) 1460 { 1461 dmsg_master_service_info_t *info; 1462 1463 switch(conf->state) { 1464 case H2MC_STOPPED: 1465 case H2MC_CONNECT: 1466 conf->fd = dmsg_connect(hostname); 1467 if (conf->fd < 0) { 1468 fprintf(stderr, "Unable to connect to %s\n", hostname); 1469 conf->state = H2MC_CONNECT; 1470 } else if (pipe(conf->pipefd) < 0) { 1471 close(conf->fd); 1472 fprintf(stderr, "pipe() failed during volconf\n"); 1473 conf->state = H2MC_CONNECT; 1474 } else { 1475 fprintf(stderr, "VOLCONF CONNECT\n"); 1476 info = malloc(sizeof(*info)); 1477 bzero(info, sizeof(*info)); 1478 info->fd = conf->fd; 1479 info->altfd = conf->pipefd[0]; 1480 info->altmsg_callback = dmsg_volconf_signal; 1481 info->detachme = 0; 1482 conf->state = H2MC_RUNNING; 1483 pthread_create(&conf->iocom_thread, NULL, 1484 dmsg_master_service, info); 1485 } 1486 break; 1487 case H2MC_RUNNING: 1488 break; 1489 } 1490 } 1491 1492 static 1493 void 1494 dmsg_volconf_stop(h2span_media_config_t *conf) 1495 { 1496 switch(conf->state) { 1497 case H2MC_STOPPED: 1498 break; 1499 case H2MC_CONNECT: 1500 conf->state = H2MC_STOPPED; 1501 break; 1502 case H2MC_RUNNING: 1503 close(conf->pipefd[1]); 1504 conf->pipefd[1] = -1; 1505 pthread_join(conf->iocom_thread, NULL); 1506 conf->iocom_thread = NULL; 1507 conf->state = H2MC_STOPPED; 1508 break; 1509 } 1510 } 1511 1512 static 1513 void 1514 dmsg_volconf_signal(dmsg_iocom_t *iocom) 1515 { 1516 atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF); 1517 } 1518 1519 /************************************************************************ 1520 * MESSAGE ROUTING AND SOURCE VALIDATION * 1521 ************************************************************************/ 1522 1523 int 1524 dmsg_circuit_route(dmsg_msg_t *msg) 1525 { 1526 dmsg_iocom_t *iocom = msg->iocom; 1527 dmsg_circuit_t *circ; 1528 dmsg_circuit_t *peer; 1529 dmsg_circuit_t dummy; 1530 int error = 0; 1531 1532 /* 1533 * Relay occurs before any state processing, msg state should always 1534 * be NULL. 1535 */ 1536 assert(msg->state == NULL); 1537 1538 /* 1539 * Lookup the circuit on the incoming iocom. 1540 */ 1541 pthread_mutex_lock(&iocom->mtx); 1542 1543 dummy.msgid = msg->any.head.circuit; 1544 circ = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree, &dummy); 1545 assert(circ); 1546 peer = circ->peer; 1547 dmsg_circuit_hold(peer); 1548 1549 if (DMsgDebugOpt >= 4) { 1550 fprintf(stderr, 1551 "CIRC relay %08x %p->%p\n", 1552 msg->any.head.cmd, circ, peer); 1553 } 1554 1555 msg->iocom = peer->iocom; 1556 msg->any.head.circuit = peer->msgid; 1557 dmsg_circuit_drop_locked(msg->circuit); 1558 msg->circuit = peer; 1559 1560 pthread_mutex_unlock(&iocom->mtx); 1561 1562 dmsg_msg_write(msg); 1563 error = DMSG_IOQ_ERROR_ROUTED; 1564 1565 return error; 1566 } 1567 1568 /************************************************************************ 1569 * ROUTER AND MESSAGING HANDLES * 1570 ************************************************************************ 1571 * 1572 * Basically the idea here is to provide a stable data structure which 1573 * can be localized to the caller for higher level protocols to work with. 1574 * Depends on the context, these dmsg_handle's can be pooled by use-case 1575 * and remain persistent through a client (or mount point's) life. 1576 */ 1577 1578 #if 0 1579 /* 1580 * Obtain a stable handle on a cluster given its uuid. This ties directly 1581 * into the global cluster topology, creating the structure if necessary 1582 * (even if the uuid does not exist or does not exist yet), and preventing 1583 * the structure from getting ripped out from under us while we hold a 1584 * pointer to it. 1585 */ 1586 h2span_cluster_t * 1587 dmsg_cluster_get(uuid_t *pfs_clid) 1588 { 1589 h2span_cluster_t dummy_cls; 1590 h2span_cluster_t *cls; 1591 1592 dummy_cls.pfs_clid = *pfs_clid; 1593 pthread_mutex_lock(&cluster_mtx); 1594 cls = RB_FIND(h2span_cluster_tree, &cluster_tree, &dummy_cls); 1595 if (cls) 1596 ++cls->refs; 1597 pthread_mutex_unlock(&cluster_mtx); 1598 return (cls); 1599 } 1600 1601 void 1602 dmsg_cluster_put(h2span_cluster_t *cls) 1603 { 1604 pthread_mutex_lock(&cluster_mtx); 1605 assert(cls->refs > 0); 1606 --cls->refs; 1607 if (RB_EMPTY(&cls->tree) && cls->refs == 0) { 1608 RB_REMOVE(h2span_cluster_tree, 1609 &cluster_tree, cls); 1610 dmsg_free(cls); 1611 } 1612 pthread_mutex_unlock(&cluster_mtx); 1613 } 1614 1615 /* 1616 * Obtain a stable handle to a specific cluster node given its uuid. 1617 * This handle does NOT lock in the route to the node and is typically 1618 * used as part of the dmsg_handle_*() API to obtain a set of 1619 * stable nodes. 1620 */ 1621 h2span_node_t * 1622 dmsg_node_get(h2span_cluster_t *cls, uuid_t *pfs_fsid) 1623 { 1624 } 1625 1626 #endif 1627 1628 /* 1629 * Dumps the spanning tree 1630 * 1631 * DEBUG ONLY 1632 */ 1633 void 1634 dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused) 1635 { 1636 h2span_cluster_t *cls; 1637 h2span_node_t *node; 1638 h2span_link_t *slink; 1639 h2span_relay_t *relay; 1640 char *uustr = NULL; 1641 1642 pthread_mutex_lock(&cluster_mtx); 1643 RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) { 1644 dmsg_circuit_printf(circuit, "Cluster %s %s (%s)\n", 1645 dmsg_peer_type_to_str(cls->peer_type), 1646 dmsg_uuid_to_str(&cls->pfs_clid, &uustr), 1647 cls->cl_label); 1648 RB_FOREACH(node, h2span_node_tree, &cls->tree) { 1649 dmsg_circuit_printf(circuit, " Node %s %s (%s)\n", 1650 dmsg_pfs_type_to_str(node->pfs_type), 1651 dmsg_uuid_to_str(&node->pfs_fsid, &uustr), 1652 node->fs_label); 1653 RB_FOREACH(slink, h2span_link_tree, &node->tree) { 1654 dmsg_circuit_printf(circuit, 1655 "\tSLink msgid %016jx " 1656 "dist=%d via %d\n", 1657 (intmax_t)slink->state->msgid, 1658 slink->dist, 1659 slink->state->iocom->sock_fd); 1660 TAILQ_FOREACH(relay, &slink->relayq, entry) { 1661 dmsg_circuit_printf(circuit, 1662 "\t Relay-out msgid %016jx " 1663 "via %d\n", 1664 (intmax_t)relay->target_rt->msgid, 1665 relay->target_rt->iocom->sock_fd); 1666 } 1667 } 1668 } 1669 } 1670 pthread_mutex_unlock(&cluster_mtx); 1671 if (uustr) 1672 free(uustr); 1673 #if 0 1674 TAILQ_FOREACH(conn, &connq, entry) { 1675 } 1676 #endif 1677 } 1678 1679 /* 1680 * DEBUG ONLY 1681 * 1682 * Locate the state representing an incoming LNK_SPAN given its msgid. 1683 */ 1684 int 1685 dmsg_debug_findspan(uint64_t msgid, dmsg_state_t **statep) 1686 { 1687 h2span_cluster_t *cls; 1688 h2span_node_t *node; 1689 h2span_link_t *slink; 1690 1691 pthread_mutex_lock(&cluster_mtx); 1692 RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) { 1693 RB_FOREACH(node, h2span_node_tree, &cls->tree) { 1694 RB_FOREACH(slink, h2span_link_tree, &node->tree) { 1695 if (slink->state->msgid == msgid) { 1696 *statep = slink->state; 1697 goto found; 1698 } 1699 } 1700 } 1701 } 1702 pthread_mutex_unlock(&cluster_mtx); 1703 *statep = NULL; 1704 return(ENOENT); 1705 found: 1706 pthread_mutex_unlock(&cluster_mtx); 1707 return(0); 1708 } 1709 1710 /* 1711 * Random number sub-sort value to add to SPAN rnss fields on relay. 1712 * This allows us to differentiate spans with the same <dist> field 1713 * for relaying purposes. We must normally limit the number of relays 1714 * for any given SPAN origination but we must also guarantee that a 1715 * symmetric reverse path exists, so we use the rnss field as a sub-sort 1716 * (since there can be thousands or millions if we only match on <dist>), 1717 * and if there STILL too many spans we go past the limit. 1718 */ 1719 static 1720 uint32_t 1721 dmsg_rnss(void) 1722 { 1723 if (DMsgRNSS == 0) { 1724 pthread_mutex_lock(&cluster_mtx); 1725 while (DMsgRNSS == 0) { 1726 srandomdev(); 1727 DMsgRNSS = random(); 1728 } 1729 pthread_mutex_unlock(&cluster_mtx); 1730 } 1731 return(DMsgRNSS); 1732 } 1733