xref: /dragonfly/lib/libdmsg/msg_lnk.c (revision 666e46d7)
1 /*
2  * Copyright (c) 2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 /*
35  * LNK_SPAN PROTOCOL SUPPORT FUNCTIONS
36  *
37  * This code supports the LNK_SPAN protocol.  Essentially all PFS's
38  * clients and services rendezvous with the userland hammer2 service and
39  * open LNK_SPAN transactions using a message header linkid of 0,
40  * registering any PFS's they have connectivity to with us.
41  *
42  * --
43  *
44  * Each registration maintains its own open LNK_SPAN message transaction.
45  * The SPANs are collected, aggregated, and retransmitted over available
46  * connections through the maintainance of additional LNK_SPAN message
47  * transactions on each link.
48  *
49  * The msgid for each active LNK_SPAN transaction we receive allows us to
50  * send a message to the target PFS (which might be one of many belonging
51  * to the same cluster), by specifying that msgid as the linkid in any
52  * message we send to the target PFS.
53  *
54  * Similarly the msgid we allocate for any LNK_SPAN transaction we transmit
55  * (and remember we will maintain multiple open LNK_SPAN transactions on
56  * each connection representing the topology span, so every node sees every
57  * other node as a separate open transaction).  So, similarly the msgid for
58  * these active transactions which we initiated can be used by the other
59  * end to route messages through us to another node, ultimately winding up
60  * at the identified hammer2 PFS.  We have to adjust the spanid in the message
61  * header at each hop to be representative of the outgoing LNK_SPAN we
62  * are forwarding the message through.
63  *
64  * --
65  *
66  * If we were to retransmit every LNK_SPAN transaction we receive it would
67  * create a huge mess, so we have to aggregate all received LNK_SPAN
68  * transactions, sort them by the fsid (the cluster) and sub-sort them by
69  * the pfs_fsid (individual nodes in the cluster), and only retransmit
70  * (create outgoing transactions) for a subset of the nearest distance-hops
71  * for each individual node.
72  *
73  * The higher level protocols can then issue transactions to the nodes making
74  * up a cluster to perform all actions required.
75  *
76  * --
77  *
78  * Since this is a large topology and a spanning tree protocol, links can
79  * go up and down all the time.  Any time a link goes down its transaction
80  * is closed.  The transaction has to be closed on both ends before we can
81  * delete (and potentially reuse) the related spanid.  The LNK_SPAN being
82  * closed may have been propagated out to other connections and those related
83  * LNK_SPANs are also closed.  Ultimately all routes via the lost LNK_SPAN
84  * go away, ultimately reaching all sources and all targets.
85  *
86  * Any messages in-transit using a route that goes away will be thrown away.
87  * Open transactions are only tracked at the two end-points.  When a link
88  * failure propagates to an end-point the related open transactions lose
89  * their spanid and are automatically aborted.
90  *
91  * It is important to note that internal route nodes cannot just associate
92  * a lost LNK_SPAN transaction with another route to the same destination.
93  * Message transactions MUST be serialized and MUST be ordered.  All messages
94  * for a transaction must run over the same route.  So if the route used by
95  * an active transaction is lost, the related messages will be fully aborted
96  * and the higher protocol levels will retry as appropriate.
97  *
98  * FULLY ABORTING A ROUTED MESSAGE is handled via link-failure propagation
99  * back to the originator.  Only the originator keeps tracks of a message.
100  * Routers just pass it through.  If a route is lost during transit the
101  * message is simply thrown away.
102  *
103  * It is also important to note that several paths to the same PFS can be
104  * propagated along the same link, which allows concurrency and even
105  * redundancy over several network interfaces or via different routes through
106  * the topology.  Any given transaction will use only a single route but busy
107  * servers will often have hundreds of transactions active simultaniously,
108  * so having multiple active paths through the network topology for A<->B
109  * will improve performance.
110  *
111  * --
112  *
113  * Most protocols consolidate operations rather than simply relaying them.
114  * This is particularly true of LEAF protocols (such as strict HAMMER2
115  * clients), of which there can be millions connecting into the cluster at
116  * various points.  The SPAN protocol is not used for these LEAF elements.
117  *
118  * Instead the primary service they connect to implements a proxy for the
119  * client protocols so the core topology only has to propagate a couple of
120  * LNK_SPANs and not millions.  LNK_SPANs are meant to be used only for
121  * core master nodes and satellite slaves and cache nodes.
122  */
123 
124 #include "dmsg_local.h"
125 
126 /*
127  * Maximum spanning tree distance.  This has the practical effect of
128  * stopping tail-chasing closed loops when a feeder span is lost.
129  */
130 #define DMSG_SPAN_MAXDIST	16
131 
132 /*
133  * RED-BLACK TREE DEFINITIONS
134  *
135  * We need to track:
136  *
137  * (1) shared fsid's (a cluster).
138  * (2) unique fsid's (a node in a cluster) <--- LNK_SPAN transactions.
139  *
140  * We need to aggegate all active LNK_SPANs, aggregate, and create our own
141  * outgoing LNK_SPAN transactions on each of our connections representing
142  * the aggregated state.
143  *
144  * h2span_conn		- list of iocom connections who wish to receive SPAN
145  *			  propagation from other connections.  Might contain
146  *			  a filter string.  Only iocom's with an open
147  *			  LNK_CONN transactions are applicable for SPAN
148  *			  propagation.
149  *
150  * h2span_relay		- List of links relayed (via SPAN).  Essentially
151  *			  each relay structure represents a LNK_SPAN
152  *			  transaction that we initiated, verses h2span_link
153  *			  which is a LNK_SPAN transaction that we received.
154  *
155  * --
156  *
157  * h2span_cluster	- Organizes the shared fsid's.  One structure for
158  *			  each cluster.
159  *
160  * h2span_node		- Organizes the nodes in a cluster.  One structure
161  *			  for each unique {cluster,node}, aka {fsid, pfs_fsid}.
162  *
163  * h2span_link		- Organizes all incoming and outgoing LNK_SPAN message
164  *			  transactions related to a node.
165  *
166  *			  One h2span_link structure for each incoming LNK_SPAN
167  *			  transaction.  Links selected for propagation back
168  *			  out are also where the outgoing LNK_SPAN messages
169  *			  are indexed into (so we can propagate changes).
170  *
171  *			  The h2span_link's use a red-black tree to sort the
172  *			  distance hop metric for the incoming LNK_SPAN.  We
173  *			  then select the top N for outgoing.  When the
174  *			  topology changes the top N may also change and cause
175  *			  new outgoing LNK_SPAN transactions to be opened
176  *			  and less desireable ones to be closed, causing
177  *			  transactional aborts within the message flow in
178  *			  the process.
179  *
180  * Also note		- All outgoing LNK_SPAN message transactions are also
181  *			  entered into a red-black tree for use by the routing
182  *			  function.  This is handled by msg.c in the state
183  *			  code, not here.
184  */
185 
186 struct h2span_link;
187 struct h2span_relay;
188 TAILQ_HEAD(h2span_media_queue, h2span_media);
189 TAILQ_HEAD(h2span_conn_queue, h2span_conn);
190 TAILQ_HEAD(h2span_relay_queue, h2span_relay);
191 
192 RB_HEAD(h2span_cluster_tree, h2span_cluster);
193 RB_HEAD(h2span_node_tree, h2span_node);
194 RB_HEAD(h2span_link_tree, h2span_link);
195 RB_HEAD(h2span_relay_tree, h2span_relay);
196 
197 /*
198  * This represents a media
199  */
200 struct h2span_media {
201 	TAILQ_ENTRY(h2span_media) entry;
202 	uuid_t	mediaid;
203 	int	refs;
204 	struct h2span_media_config {
205 		dmsg_vol_data_t		copy_run;
206 		dmsg_vol_data_t		copy_pend;
207 		pthread_t		thread;
208 		pthread_cond_t		cond;
209 		int			ctl;
210 		int			fd;
211 		dmsg_iocom_t		iocom;
212 		pthread_t		iocom_thread;
213 		enum { H2MC_STOPPED, H2MC_CONNECT, H2MC_RUNNING } state;
214 	} config[DMSG_COPYID_COUNT];
215 };
216 
217 typedef struct h2span_media_config h2span_media_config_t;
218 
219 #define H2CONFCTL_STOP		0x00000001
220 #define H2CONFCTL_UPDATE	0x00000002
221 
222 /*
223  * Received LNK_CONN transaction enables SPAN protocol over connection.
224  * (may contain filter).  Typically one for each mount and several may
225  * share the same media.
226  */
227 struct h2span_conn {
228 	TAILQ_ENTRY(h2span_conn) entry;
229 	struct h2span_relay_tree tree;
230 	struct h2span_media *media;
231 	dmsg_state_t *state;
232 };
233 
234 /*
235  * All received LNK_SPANs are organized by cluster (pfs_clid),
236  * node (pfs_fsid), and link (received LNK_SPAN transaction).
237  */
238 struct h2span_cluster {
239 	RB_ENTRY(h2span_cluster) rbnode;
240 	struct h2span_node_tree tree;
241 	uuid_t	pfs_clid;		/* shared fsid */
242 	uint8_t	peer_type;
243 	char	cl_label[128];		/* cluster label (typ PEER_BLOCK) */
244 	int	refs;			/* prevents destruction */
245 };
246 
247 struct h2span_node {
248 	RB_ENTRY(h2span_node) rbnode;
249 	struct h2span_link_tree tree;
250 	struct h2span_cluster *cls;
251 	uint8_t	pfs_type;
252 	uuid_t	pfs_fsid;		/* unique fsid */
253 	char	fs_label[128];		/* fs label (typ PEER_HAMMER2) */
254 };
255 
256 struct h2span_link {
257 	RB_ENTRY(h2span_link) rbnode;
258 	dmsg_state_t	*state;		/* state<->link */
259 	struct h2span_node *node;	/* related node */
260 	int32_t	dist;
261 	struct h2span_relay_queue relayq; /* relay out */
262 	struct dmsg_router *router;	/* route out this link */
263 };
264 
265 /*
266  * Any LNK_SPAN transactions we receive which are relayed out other
267  * connections utilize this structure to track the LNK_SPAN transaction
268  * we initiate on the other connections, if selected for relay.
269  *
270  * In many respects this is the core of the protocol... actually figuring
271  * out what LNK_SPANs to relay.  The spanid used for relaying is the
272  * address of the 'state' structure, which is why h2span_relay has to
273  * be entered into a RB-TREE based at h2span_conn (so we can look
274  * up the spanid to validate it).
275  *
276  * NOTE: Messages can be received via the LNK_SPAN transaction the
277  *	 relay maintains, and can be replied via relay->router, but
278  *	 messages are NOT initiated via a relay.  Messages are initiated
279  *	 via incoming links (h2span_link's).
280  *
281  *	 relay->link represents the link being relayed, NOT the LNK_SPAN
282  *	 transaction the relay is holding open.
283  */
284 struct h2span_relay {
285 	RB_ENTRY(h2span_relay) rbnode;	/* from h2span_conn */
286 	TAILQ_ENTRY(h2span_relay) entry; /* from link */
287 	struct h2span_conn *conn;
288 	dmsg_state_t	*state;		/* transmitted LNK_SPAN */
289 	struct h2span_link *link;	/* LNK_SPAN being relayed */
290 	struct dmsg_router	*router;/* route out this relay */
291 };
292 
293 
294 typedef struct h2span_media h2span_media_t;
295 typedef struct h2span_conn h2span_conn_t;
296 typedef struct h2span_cluster h2span_cluster_t;
297 typedef struct h2span_node h2span_node_t;
298 typedef struct h2span_link h2span_link_t;
299 typedef struct h2span_relay h2span_relay_t;
300 
301 #define dmsg_termstr(array)	_dmsg_termstr((array), sizeof(array))
302 
303 static __inline
304 void
305 _dmsg_termstr(char *base, size_t size)
306 {
307 	base[size-1] = 0;
308 }
309 
310 /*
311  * Cluster peer_type, uuid, AND label must match for a match
312  */
313 static
314 int
315 h2span_cluster_cmp(h2span_cluster_t *cls1, h2span_cluster_t *cls2)
316 {
317 	int r;
318 
319 	if (cls1->peer_type < cls2->peer_type)
320 		return(-1);
321 	if (cls1->peer_type > cls2->peer_type)
322 		return(1);
323 	r = uuid_compare(&cls1->pfs_clid, &cls2->pfs_clid, NULL);
324 	if (r == 0)
325 		r = strcmp(cls1->cl_label, cls2->cl_label);
326 
327 	return r;
328 }
329 
330 /*
331  * Match against the uuid.  Currently we never match against the label.
332  */
333 static
334 int
335 h2span_node_cmp(h2span_node_t *node1, h2span_node_t *node2)
336 {
337 	int r;
338 
339 	r = uuid_compare(&node1->pfs_fsid, &node2->pfs_fsid, NULL);
340 	return (r);
341 }
342 
343 /*
344  * Sort/subsort must match h2span_relay_cmp() under any given node
345  * to make the aggregation algorithm easier, so the best links are
346  * in the same sorted order as the best relays.
347  *
348  * NOTE: We cannot use link*->state->msgid because this msgid is created
349  *	 by each remote host and thus might wind up being the same.
350  */
351 static
352 int
353 h2span_link_cmp(h2span_link_t *link1, h2span_link_t *link2)
354 {
355 	if (link1->dist < link2->dist)
356 		return(-1);
357 	if (link1->dist > link2->dist)
358 		return(1);
359 #if 1
360 	if ((uintptr_t)link1->state < (uintptr_t)link2->state)
361 		return(-1);
362 	if ((uintptr_t)link1->state > (uintptr_t)link2->state)
363 		return(1);
364 #else
365 	if (link1->state->msgid < link2->state->msgid)
366 		return(-1);
367 	if (link1->state->msgid > link2->state->msgid)
368 		return(1);
369 #endif
370 	return(0);
371 }
372 
373 /*
374  * Relay entries are sorted by node, subsorted by distance and link
375  * address (so we can match up the conn->tree relay topology with
376  * a node's link topology).
377  */
378 static
379 int
380 h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2)
381 {
382 	h2span_link_t *link1 = relay1->link;
383 	h2span_link_t *link2 = relay2->link;
384 
385 	if ((intptr_t)link1->node < (intptr_t)link2->node)
386 		return(-1);
387 	if ((intptr_t)link1->node > (intptr_t)link2->node)
388 		return(1);
389 	if (link1->dist < link2->dist)
390 		return(-1);
391 	if (link1->dist > link2->dist)
392 		return(1);
393 #if 1
394 	if ((uintptr_t)link1->state < (uintptr_t)link2->state)
395 		return(-1);
396 	if ((uintptr_t)link1->state > (uintptr_t)link2->state)
397 		return(1);
398 #else
399 	if (link1->state->msgid < link2->state->msgid)
400 		return(-1);
401 	if (link1->state->msgid > link2->state->msgid)
402 		return(1);
403 #endif
404 	return(0);
405 }
406 
407 RB_PROTOTYPE_STATIC(h2span_cluster_tree, h2span_cluster,
408 	     rbnode, h2span_cluster_cmp);
409 RB_PROTOTYPE_STATIC(h2span_node_tree, h2span_node,
410 	     rbnode, h2span_node_cmp);
411 RB_PROTOTYPE_STATIC(h2span_link_tree, h2span_link,
412 	     rbnode, h2span_link_cmp);
413 RB_PROTOTYPE_STATIC(h2span_relay_tree, h2span_relay,
414 	     rbnode, h2span_relay_cmp);
415 
416 RB_GENERATE_STATIC(h2span_cluster_tree, h2span_cluster,
417 	     rbnode, h2span_cluster_cmp);
418 RB_GENERATE_STATIC(h2span_node_tree, h2span_node,
419 	     rbnode, h2span_node_cmp);
420 RB_GENERATE_STATIC(h2span_link_tree, h2span_link,
421 	     rbnode, h2span_link_cmp);
422 RB_GENERATE_STATIC(h2span_relay_tree, h2span_relay,
423 	     rbnode, h2span_relay_cmp);
424 
425 /*
426  * Global mutex protects cluster_tree lookups, connq, mediaq.
427  */
428 static pthread_mutex_t cluster_mtx;
429 static struct h2span_cluster_tree cluster_tree = RB_INITIALIZER(cluster_tree);
430 static struct h2span_conn_queue connq = TAILQ_HEAD_INITIALIZER(connq);
431 static struct h2span_media_queue mediaq = TAILQ_HEAD_INITIALIZER(mediaq);
432 
433 static void dmsg_lnk_span(dmsg_msg_t *msg);
434 static void dmsg_lnk_conn(dmsg_msg_t *msg);
435 static void dmsg_lnk_relay(dmsg_msg_t *msg);
436 static void dmsg_relay_scan(h2span_conn_t *conn, h2span_node_t *node);
437 static void dmsg_relay_delete(h2span_relay_t *relay);
438 
439 static void *dmsg_volconf_thread(void *info);
440 static void dmsg_volconf_stop(h2span_media_config_t *conf);
441 static void dmsg_volconf_start(h2span_media_config_t *conf,
442 				const char *hostname);
443 
444 void
445 dmsg_msg_lnk_signal(dmsg_router_t *router __unused)
446 {
447 	pthread_mutex_lock(&cluster_mtx);
448 	dmsg_relay_scan(NULL, NULL);
449 	pthread_mutex_unlock(&cluster_mtx);
450 }
451 
452 /*
453  * Receive a DMSG_PROTO_LNK message.  This only called for
454  * one-way and opening-transactions since state->func will be assigned
455  * in all other cases.
456  */
457 void
458 dmsg_msg_lnk(dmsg_msg_t *msg)
459 {
460 	switch(msg->any.head.cmd & DMSGF_BASECMDMASK) {
461 	case DMSG_LNK_CONN:
462 		dmsg_lnk_conn(msg);
463 		break;
464 	case DMSG_LNK_SPAN:
465 		dmsg_lnk_span(msg);
466 		break;
467 	default:
468 		fprintf(stderr,
469 			"MSG_PROTO_LNK: Unknown msg %08x\n", msg->any.head.cmd);
470 		dmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
471 		/* state invalid after reply */
472 		break;
473 	}
474 }
475 
476 void
477 dmsg_lnk_conn(dmsg_msg_t *msg)
478 {
479 	dmsg_state_t *state = msg->state;
480 	h2span_media_t *media;
481 	h2span_media_config_t *conf;
482 	h2span_conn_t *conn;
483 	h2span_relay_t *relay;
484 	char *alloc = NULL;
485 	int i;
486 
487 	pthread_mutex_lock(&cluster_mtx);
488 
489 	switch(msg->any.head.cmd & DMSGF_TRANSMASK) {
490 	case DMSG_LNK_CONN | DMSGF_CREATE:
491 	case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE:
492 		/*
493 		 * On transaction start we allocate a new h2span_conn and
494 		 * acknowledge the request, leaving the transaction open.
495 		 * We then relay priority-selected SPANs.
496 		 */
497 		fprintf(stderr, "LNK_CONN(%08x): %s/%s/%s\n",
498 			(uint32_t)msg->any.head.msgid,
499 			dmsg_uuid_to_str(&msg->any.lnk_conn.pfs_clid,
500 					    &alloc),
501 			msg->any.lnk_conn.cl_label,
502 			msg->any.lnk_conn.fs_label);
503 		free(alloc);
504 
505 		conn = dmsg_alloc(sizeof(*conn));
506 
507 		RB_INIT(&conn->tree);
508 		conn->state = state;
509 		state->func = dmsg_lnk_conn;
510 		state->any.conn = conn;
511 		TAILQ_INSERT_TAIL(&connq, conn, entry);
512 
513 		/*
514 		 * Set up media
515 		 */
516 		TAILQ_FOREACH(media, &mediaq, entry) {
517 			if (uuid_compare(&msg->any.lnk_conn.mediaid,
518 					 &media->mediaid, NULL) == 0) {
519 				break;
520 			}
521 		}
522 		if (media == NULL) {
523 			media = dmsg_alloc(sizeof(*media));
524 			media->mediaid = msg->any.lnk_conn.mediaid;
525 			TAILQ_INSERT_TAIL(&mediaq, media, entry);
526 		}
527 		conn->media = media;
528 		++media->refs;
529 
530 		if ((msg->any.head.cmd & DMSGF_DELETE) == 0) {
531 			dmsg_msg_result(msg, 0);
532 			dmsg_router_signal(msg->router);
533 			break;
534 		}
535 		/* FALL THROUGH */
536 	case DMSG_LNK_CONN | DMSGF_DELETE:
537 	case DMSG_LNK_ERROR | DMSGF_DELETE:
538 deleteconn:
539 		/*
540 		 * On transaction terminate we clean out our h2span_conn
541 		 * and acknowledge the request, closing the transaction.
542 		 */
543 		fprintf(stderr, "LNK_CONN: Terminated\n");
544 		conn = state->any.conn;
545 		assert(conn);
546 
547 		/*
548 		 * Clean out the media structure. If refs drops to zero we
549 		 * also clean out the media config threads.  These threads
550 		 * maintain span connections to other hammer2 service daemons.
551 		 */
552 		media = conn->media;
553 		if (--media->refs == 0) {
554 			fprintf(stderr, "Shutting down media spans\n");
555 			for (i = 0; i < DMSG_COPYID_COUNT; ++i) {
556 				conf = &media->config[i];
557 
558 				if (conf->thread == NULL)
559 					continue;
560 				conf->ctl = H2CONFCTL_STOP;
561 				pthread_cond_signal(&conf->cond);
562 			}
563 			for (i = 0; i < DMSG_COPYID_COUNT; ++i) {
564 				conf = &media->config[i];
565 
566 				if (conf->thread == NULL)
567 					continue;
568 				pthread_mutex_unlock(&cluster_mtx);
569 				pthread_join(conf->thread, NULL);
570 				pthread_mutex_lock(&cluster_mtx);
571 				conf->thread = NULL;
572 				pthread_cond_destroy(&conf->cond);
573 			}
574 			fprintf(stderr, "Media shutdown complete\n");
575 			TAILQ_REMOVE(&mediaq, media, entry);
576 			dmsg_free(media);
577 		}
578 
579 		/*
580 		 * Clean out all relays.  This requires terminating each
581 		 * relay transaction.
582 		 */
583 		while ((relay = RB_ROOT(&conn->tree)) != NULL) {
584 			dmsg_relay_delete(relay);
585 		}
586 
587 		/*
588 		 * Clean out conn
589 		 */
590 		conn->media = NULL;
591 		conn->state = NULL;
592 		msg->state->any.conn = NULL;
593 		TAILQ_REMOVE(&connq, conn, entry);
594 		dmsg_free(conn);
595 
596 		dmsg_msg_reply(msg, 0);
597 		/* state invalid after reply */
598 		break;
599 	case DMSG_LNK_VOLCONF:
600 		/*
601 		 * One-way volume-configuration message is transmitted
602 		 * over the open LNK_CONN transaction.
603 		 */
604 		fprintf(stderr, "RECEIVED VOLCONF\n");
605 		if (msg->any.lnk_volconf.index < 0 ||
606 		    msg->any.lnk_volconf.index >= DMSG_COPYID_COUNT) {
607 			fprintf(stderr, "VOLCONF: ILLEGAL INDEX %d\n",
608 				msg->any.lnk_volconf.index);
609 			break;
610 		}
611 		if (msg->any.lnk_volconf.copy.path[sizeof(msg->any.lnk_volconf.copy.path) - 1] != 0 ||
612 		    msg->any.lnk_volconf.copy.path[0] == 0) {
613 			fprintf(stderr, "VOLCONF: ILLEGAL PATH %d\n",
614 				msg->any.lnk_volconf.index);
615 			break;
616 		}
617 		conn = msg->state->any.conn;
618 		if (conn == NULL) {
619 			fprintf(stderr, "VOLCONF: LNK_CONN is missing\n");
620 			break;
621 		}
622 		conf = &conn->media->config[msg->any.lnk_volconf.index];
623 		conf->copy_pend = msg->any.lnk_volconf.copy;
624 		conf->ctl |= H2CONFCTL_UPDATE;
625 		if (conf->thread == NULL) {
626 			fprintf(stderr, "VOLCONF THREAD STARTED\n");
627 			pthread_cond_init(&conf->cond, NULL);
628 			pthread_create(&conf->thread, NULL,
629 				       dmsg_volconf_thread, (void *)conf);
630 		}
631 		pthread_cond_signal(&conf->cond);
632 		break;
633 	default:
634 		/*
635 		 * Failsafe
636 		 */
637 		if (msg->any.head.cmd & DMSGF_DELETE)
638 			goto deleteconn;
639 		dmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
640 		break;
641 	}
642 	pthread_mutex_unlock(&cluster_mtx);
643 }
644 
645 void
646 dmsg_lnk_span(dmsg_msg_t *msg)
647 {
648 	dmsg_state_t *state = msg->state;
649 	h2span_cluster_t dummy_cls;
650 	h2span_node_t dummy_node;
651 	h2span_cluster_t *cls;
652 	h2span_node_t *node;
653 	h2span_link_t *slink;
654 	h2span_relay_t *relay;
655 	char *alloc = NULL;
656 
657 	assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
658 
659 	pthread_mutex_lock(&cluster_mtx);
660 
661 	/*
662 	 * On transaction start we initialize the tracking infrastructure
663 	 */
664 	if (msg->any.head.cmd & DMSGF_CREATE) {
665 		assert(state->func == NULL);
666 		state->func = dmsg_lnk_span;
667 
668 		dmsg_termstr(msg->any.lnk_span.cl_label);
669 		dmsg_termstr(msg->any.lnk_span.fs_label);
670 
671 		/*
672 		 * Find the cluster
673 		 */
674 		dummy_cls.pfs_clid = msg->any.lnk_span.pfs_clid;
675 		dummy_cls.peer_type = msg->any.lnk_span.peer_type;
676 		bcopy(msg->any.lnk_span.cl_label,
677 		      dummy_cls.cl_label,
678 		      sizeof(dummy_cls.cl_label));
679 		cls = RB_FIND(h2span_cluster_tree, &cluster_tree, &dummy_cls);
680 		if (cls == NULL) {
681 			cls = dmsg_alloc(sizeof(*cls));
682 			cls->pfs_clid = msg->any.lnk_span.pfs_clid;
683 			cls->peer_type = msg->any.lnk_span.peer_type;
684 			bcopy(msg->any.lnk_span.cl_label,
685 			      cls->cl_label,
686 			      sizeof(cls->cl_label));
687 			RB_INIT(&cls->tree);
688 			RB_INSERT(h2span_cluster_tree, &cluster_tree, cls);
689 		}
690 
691 		/*
692 		 * Find the node
693 		 */
694 		dummy_node.pfs_fsid = msg->any.lnk_span.pfs_fsid;
695 		node = RB_FIND(h2span_node_tree, &cls->tree, &dummy_node);
696 		if (node == NULL) {
697 			node = dmsg_alloc(sizeof(*node));
698 			node->pfs_fsid = msg->any.lnk_span.pfs_fsid;
699 			bcopy(msg->any.lnk_span.fs_label,
700 			      node->fs_label,
701 			      sizeof(node->fs_label));
702 			node->cls = cls;
703 			RB_INIT(&node->tree);
704 			RB_INSERT(h2span_node_tree, &cls->tree, node);
705 		}
706 
707 		/*
708 		 * Create the link
709 		 */
710 		assert(state->any.link == NULL);
711 		slink = dmsg_alloc(sizeof(*slink));
712 		TAILQ_INIT(&slink->relayq);
713 		slink->node = node;
714 		slink->dist = msg->any.lnk_span.dist;
715 		slink->state = state;
716 		state->any.link = slink;
717 
718 		/*
719 		 * Embedded router structure in link for message forwarding.
720 		 *
721 		 * The spanning id for the router is the message id of
722 		 * the SPAN link it is embedded in, allowing messages to
723 		 * be routed via &slink->router.
724 		 */
725 		slink->router = dmsg_router_alloc();
726 		slink->router->iocom = state->iocom;
727 		slink->router->link = slink;
728 		slink->router->target = state->msgid;
729 		dmsg_router_connect(slink->router);
730 
731 		RB_INSERT(h2span_link_tree, &node->tree, slink);
732 
733 		fprintf(stderr,
734 			"LNK_SPAN(thr %p): %p %s cl=%s fs=%s dist=%d\n",
735 			msg->router->iocom,
736 			slink,
737 			dmsg_uuid_to_str(&msg->any.lnk_span.pfs_clid, &alloc),
738 			msg->any.lnk_span.cl_label,
739 			msg->any.lnk_span.fs_label,
740 			msg->any.lnk_span.dist);
741 		free(alloc);
742 #if 0
743 		dmsg_relay_scan(NULL, node);
744 #endif
745 		dmsg_router_signal(msg->router);
746 	}
747 
748 	/*
749 	 * On transaction terminate we remove the tracking infrastructure.
750 	 */
751 	if (msg->any.head.cmd & DMSGF_DELETE) {
752 		slink = state->any.link;
753 		assert(slink != NULL);
754 		node = slink->node;
755 		cls = node->cls;
756 
757 		fprintf(stderr, "LNK_DELE(thr %p): %p %s cl=%s fs=%s dist=%d\n",
758 			msg->router->iocom,
759 			slink,
760 			dmsg_uuid_to_str(&cls->pfs_clid, &alloc),
761 			state->msg->any.lnk_span.cl_label,
762 			state->msg->any.lnk_span.fs_label,
763 			state->msg->any.lnk_span.dist);
764 		free(alloc);
765 
766 		/*
767 		 * Remove the router from consideration
768 		 */
769 		dmsg_router_disconnect(&slink->router);
770 
771 		/*
772 		 * Clean out all relays.  This requires terminating each
773 		 * relay transaction.
774 		 */
775 		while ((relay = TAILQ_FIRST(&slink->relayq)) != NULL) {
776 			dmsg_relay_delete(relay);
777 		}
778 
779 		/*
780 		 * Clean out the topology
781 		 */
782 		RB_REMOVE(h2span_link_tree, &node->tree, slink);
783 		if (RB_EMPTY(&node->tree)) {
784 			RB_REMOVE(h2span_node_tree, &cls->tree, node);
785 			if (RB_EMPTY(&cls->tree) && cls->refs == 0) {
786 				RB_REMOVE(h2span_cluster_tree,
787 					  &cluster_tree, cls);
788 				dmsg_free(cls);
789 			}
790 			node->cls = NULL;
791 			dmsg_free(node);
792 			node = NULL;
793 		}
794 		state->any.link = NULL;
795 		slink->state = NULL;
796 		slink->node = NULL;
797 		dmsg_free(slink);
798 
799 		/*
800 		 * We have to terminate the transaction
801 		 */
802 		dmsg_state_reply(state, 0);
803 		/* state invalid after reply */
804 
805 		/*
806 		 * If the node still exists issue any required updates.  If
807 		 * it doesn't then all related relays have already been
808 		 * removed and there's nothing left to do.
809 		 */
810 #if 0
811 		if (node)
812 			dmsg_relay_scan(NULL, node);
813 #endif
814 		if (node)
815 			dmsg_router_signal(msg->router);
816 	}
817 
818 	pthread_mutex_unlock(&cluster_mtx);
819 }
820 
821 /*
822  * Messages received on relay SPANs.  These are open transactions so it is
823  * in fact possible for the other end to close the transaction.
824  *
825  * XXX MPRACE on state structure
826  */
827 static void
828 dmsg_lnk_relay(dmsg_msg_t *msg)
829 {
830 	dmsg_state_t *state = msg->state;
831 	h2span_relay_t *relay;
832 
833 	assert(msg->any.head.cmd & DMSGF_REPLY);
834 
835 	if (msg->any.head.cmd & DMSGF_DELETE) {
836 		pthread_mutex_lock(&cluster_mtx);
837 		if ((relay = state->any.relay) != NULL) {
838 			dmsg_relay_delete(relay);
839 		} else {
840 			dmsg_state_reply(state, 0);
841 		}
842 		pthread_mutex_unlock(&cluster_mtx);
843 	}
844 }
845 
846 /*
847  * Update relay transactions for SPANs.
848  *
849  * Called with cluster_mtx held.
850  */
851 static void dmsg_relay_scan_specific(h2span_node_t *node,
852 					h2span_conn_t *conn);
853 
854 static void
855 dmsg_relay_scan(h2span_conn_t *conn, h2span_node_t *node)
856 {
857 	h2span_cluster_t *cls;
858 
859 	if (node) {
860 		/*
861 		 * Iterate specific node
862 		 */
863 		TAILQ_FOREACH(conn, &connq, entry)
864 			dmsg_relay_scan_specific(node, conn);
865 	} else {
866 		/*
867 		 * Full iteration.
868 		 *
869 		 * Iterate cluster ids, nodes, and either a specific connection
870 		 * or all connections.
871 		 */
872 		RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
873 			/*
874 			 * Iterate node ids
875 			 */
876 			RB_FOREACH(node, h2span_node_tree, &cls->tree) {
877 				/*
878 				 * Synchronize the node's link (received SPANs)
879 				 * with each connection's relays.
880 				 */
881 				if (conn) {
882 					dmsg_relay_scan_specific(node, conn);
883 				} else {
884 					TAILQ_FOREACH(conn, &connq, entry) {
885 					    dmsg_relay_scan_specific(node,
886 									conn);
887 					}
888 					assert(conn == NULL);
889 				}
890 			}
891 		}
892 	}
893 }
894 
895 /*
896  * Update the relay'd SPANs for this (node, conn).
897  *
898  * Iterate links and adjust relays to match.  We only propagate the top link
899  * for now (XXX we want to propagate the top two).
900  *
901  * The dmsg_relay_scan_cmp() function locates the first relay element
902  * for any given node.  The relay elements will be sub-sorted by dist.
903  */
904 struct relay_scan_info {
905 	h2span_node_t *node;
906 	h2span_relay_t *relay;
907 };
908 
909 static int
910 dmsg_relay_scan_cmp(h2span_relay_t *relay, void *arg)
911 {
912 	struct relay_scan_info *info = arg;
913 
914 	if ((intptr_t)relay->link->node < (intptr_t)info->node)
915 		return(-1);
916 	if ((intptr_t)relay->link->node > (intptr_t)info->node)
917 		return(1);
918 	return(0);
919 }
920 
921 static int
922 dmsg_relay_scan_callback(h2span_relay_t *relay, void *arg)
923 {
924 	struct relay_scan_info *info = arg;
925 
926 	info->relay = relay;
927 	return(-1);
928 }
929 
930 static void
931 dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
932 {
933 	struct relay_scan_info info;
934 	h2span_relay_t *relay;
935 	h2span_relay_t *next_relay;
936 	h2span_link_t *slink;
937 	dmsg_lnk_conn_t *lconn;
938 	dmsg_lnk_span_t *lspan;
939 	dmsg_msg_t *msg;
940 	int count = 2;
941 
942 	info.node = node;
943 	info.relay = NULL;
944 
945 	/*
946 	 * Locate the first related relay for the node on this connection.
947 	 * relay will be NULL if there were none.
948 	 */
949 	RB_SCAN(h2span_relay_tree, &conn->tree,
950 		dmsg_relay_scan_cmp, dmsg_relay_scan_callback, &info);
951 	relay = info.relay;
952 	info.relay = NULL;
953 	if (relay)
954 		assert(relay->link->node == node);
955 
956 	if (DMsgDebugOpt > 8)
957 		fprintf(stderr, "relay scan for connection %p\n", conn);
958 
959 	/*
960 	 * Iterate the node's links (received SPANs) in distance order,
961 	 * lowest (best) dist first.
962 	 *
963 	 * PROPAGATE THE BEST LINKS OVER THE SPECIFIED CONNECTION.
964 	 *
965 	 * Track relays while iterating the best links and construct
966 	 * missing relays when necessary.
967 	 *
968 	 * (If some prior better link was removed it would have also
969 	 *  removed the relay, so the relay can only match exactly or
970 	 *  be worse).
971 	 */
972 	RB_FOREACH(slink, h2span_link_tree, &node->tree) {
973 		/*
974 		 * Match, relay already in-place, get the next
975 		 * relay to match against the next slink.
976 		 */
977 		if (relay && relay->link == slink) {
978 			relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
979 			if (--count == 0)
980 				break;
981 			continue;
982 		}
983 
984 		/*
985 		 * We might want this SLINK, if it passes our filters.
986 		 *
987 		 * The spanning tree can cause closed loops so we have
988 		 * to limit slink->dist.
989 		 */
990 		if (slink->dist > DMSG_SPAN_MAXDIST)
991 			break;
992 
993 		/*
994 		 * Don't bother transmitting a LNK_SPAN out the same
995 		 * connection it came in on.  Trivial optimization.
996 		 */
997 		if (slink->state->iocom == conn->state->iocom)
998 			break;
999 
1000 		/*
1001 		 * NOTE ON FILTERS: The protocol spec allows non-requested
1002 		 * SPANs to be transmitted, the other end is expected to
1003 		 * leave their transactions open but otherwise ignore them.
1004 		 *
1005 		 * Don't bother transmitting if the remote connection
1006 		 * is not accepting this SPAN's peer_type.
1007 		 */
1008 		lspan = &slink->state->msg->any.lnk_span;
1009 		lconn = &conn->state->msg->any.lnk_conn;
1010 		if (((1LLU << lspan->peer_type) & lconn->peer_mask) == 0)
1011 			break;
1012 
1013 		/*
1014 		 * Do not give pure clients visibility to other pure clients
1015 		 */
1016 		if (lconn->pfs_type == DMSG_PFSTYPE_CLIENT &&
1017 		    lspan->pfs_type == DMSG_PFSTYPE_CLIENT) {
1018 			break;
1019 		}
1020 
1021 		/*
1022 		 * Connection filter, if cluster uuid is not NULL it must
1023 		 * match the span cluster uuid.  Only applies when the
1024 		 * peer_type matches.
1025 		 */
1026 		if (lspan->peer_type == lconn->peer_type &&
1027 		    !uuid_is_nil(&lconn->pfs_clid, NULL) &&
1028 		    uuid_compare(&slink->node->cls->pfs_clid,
1029 				 &lconn->pfs_clid, NULL)) {
1030 			break;
1031 		}
1032 
1033 		/*
1034 		 * Connection filter, if cluster label is not empty it must
1035 		 * match the span cluster label.  Only applies when the
1036 		 * peer_type matches.
1037 		 */
1038 		if (lspan->peer_type == lconn->peer_type &&
1039 		    lconn->cl_label[0] &&
1040 		    strcmp(lconn->cl_label, slink->node->cls->cl_label)) {
1041 			break;
1042 		}
1043 
1044 		/*
1045 		 * NOTE! fs_uuid differentiates nodes within the same cluster
1046 		 *	 so we obviously don't want to match those.  Similarly
1047 		 *	 for fs_label.
1048 		 */
1049 
1050 		/*
1051 		 * Ok, we've accepted this SPAN for relaying.
1052 		 */
1053 		assert(relay == NULL ||
1054 		       relay->link->node != slink->node ||
1055 		       relay->link->dist >= slink->dist);
1056 		relay = dmsg_alloc(sizeof(*relay));
1057 		relay->conn = conn;
1058 		relay->link = slink;
1059 
1060 		msg = dmsg_msg_alloc(conn->state->iocom->router, 0,
1061 					DMSG_LNK_SPAN |
1062 					DMSGF_CREATE,
1063 					dmsg_lnk_relay, relay);
1064 		relay->state = msg->state;
1065 		relay->router = dmsg_router_alloc();
1066 		relay->router->iocom = relay->state->iocom;
1067 		relay->router->relay = relay;
1068 		relay->router->target = relay->state->msgid;
1069 
1070 		msg->any.lnk_span = slink->state->msg->any.lnk_span;
1071 		msg->any.lnk_span.dist = slink->dist + 1;
1072 
1073 		dmsg_router_connect(relay->router);
1074 
1075 		RB_INSERT(h2span_relay_tree, &conn->tree, relay);
1076 		TAILQ_INSERT_TAIL(&slink->relayq, relay, entry);
1077 
1078 		dmsg_msg_write(msg);
1079 
1080 		fprintf(stderr,
1081 			"RELAY SPAN %p RELAY %p ON CLS=%p NODE=%p DIST=%d "
1082 			"FD %d state %p\n",
1083 			slink,
1084 			relay,
1085 			node->cls, node, slink->dist,
1086 			conn->state->iocom->sock_fd, relay->state);
1087 
1088 		/*
1089 		 * Match (created new relay), get the next relay to
1090 		 * match against the next slink.
1091 		 */
1092 		relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
1093 		if (--count == 0)
1094 			break;
1095 	}
1096 
1097 	/*
1098 	 * Any remaining relay's belonging to this connection which match
1099 	 * the node are in excess of the current aggregate spanning state
1100 	 * and should be removed.
1101 	 */
1102 	while (relay && relay->link->node == node) {
1103 		next_relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
1104 		dmsg_relay_delete(relay);
1105 		relay = next_relay;
1106 	}
1107 }
1108 
1109 static
1110 void
1111 dmsg_relay_delete(h2span_relay_t *relay)
1112 {
1113 	fprintf(stderr,
1114 		"RELAY DELETE %p RELAY %p ON CLS=%p NODE=%p DIST=%d FD %d STATE %p\n",
1115 		relay->link,
1116 		relay,
1117 		relay->link->node->cls, relay->link->node,
1118 		relay->link->dist,
1119 		relay->conn->state->iocom->sock_fd, relay->state);
1120 
1121 	dmsg_router_disconnect(&relay->router);
1122 
1123 	RB_REMOVE(h2span_relay_tree, &relay->conn->tree, relay);
1124 	TAILQ_REMOVE(&relay->link->relayq, relay, entry);
1125 
1126 	if (relay->state) {
1127 		relay->state->any.relay = NULL;
1128 		dmsg_state_reply(relay->state, 0);
1129 		/* state invalid after reply */
1130 		relay->state = NULL;
1131 	}
1132 	relay->conn = NULL;
1133 	relay->link = NULL;
1134 	dmsg_free(relay);
1135 }
1136 
1137 static void *
1138 dmsg_volconf_thread(void *info)
1139 {
1140 	h2span_media_config_t *conf = info;
1141 
1142 	pthread_mutex_lock(&cluster_mtx);
1143 	while ((conf->ctl & H2CONFCTL_STOP) == 0) {
1144 		if (conf->ctl & H2CONFCTL_UPDATE) {
1145 			fprintf(stderr, "VOLCONF UPDATE\n");
1146 			conf->ctl &= ~H2CONFCTL_UPDATE;
1147 			if (bcmp(&conf->copy_run, &conf->copy_pend,
1148 				 sizeof(conf->copy_run)) == 0) {
1149 				fprintf(stderr, "VOLCONF: no changes\n");
1150 				continue;
1151 			}
1152 			/*
1153 			 * XXX TODO - auto reconnect on lookup failure or
1154 			 *		connect failure or stream failure.
1155 			 */
1156 
1157 			pthread_mutex_unlock(&cluster_mtx);
1158 			dmsg_volconf_stop(conf);
1159 			conf->copy_run = conf->copy_pend;
1160 			if (conf->copy_run.copyid != 0 &&
1161 			    strncmp(conf->copy_run.path, "span:", 5) == 0) {
1162 				dmsg_volconf_start(conf,
1163 						      conf->copy_run.path + 5);
1164 			}
1165 			pthread_mutex_lock(&cluster_mtx);
1166 			fprintf(stderr, "VOLCONF UPDATE DONE state %d\n", conf->state);
1167 		}
1168 		if (conf->state == H2MC_CONNECT) {
1169 			dmsg_volconf_start(conf, conf->copy_run.path + 5);
1170 			pthread_mutex_unlock(&cluster_mtx);
1171 			sleep(5);
1172 			pthread_mutex_lock(&cluster_mtx);
1173 		} else {
1174 			pthread_cond_wait(&conf->cond, &cluster_mtx);
1175 		}
1176 	}
1177 	pthread_mutex_unlock(&cluster_mtx);
1178 	dmsg_volconf_stop(conf);
1179 	return(NULL);
1180 }
1181 
1182 static
1183 void
1184 dmsg_volconf_stop(h2span_media_config_t *conf)
1185 {
1186 	switch(conf->state) {
1187 	case H2MC_STOPPED:
1188 		break;
1189 	case H2MC_CONNECT:
1190 		conf->state = H2MC_STOPPED;
1191 		break;
1192 	case H2MC_RUNNING:
1193 		shutdown(conf->fd, SHUT_WR);
1194 		pthread_join(conf->iocom_thread, NULL);
1195 		conf->iocom_thread = NULL;
1196 		break;
1197 	}
1198 }
1199 
1200 static
1201 void
1202 dmsg_volconf_start(h2span_media_config_t *conf, const char *hostname)
1203 {
1204 	dmsg_master_service_info_t *info;
1205 
1206 	switch(conf->state) {
1207 	case H2MC_STOPPED:
1208 	case H2MC_CONNECT:
1209 		conf->fd = dmsg_connect(hostname);
1210 		if (conf->fd < 0) {
1211 			fprintf(stderr, "Unable to connect to %s\n", hostname);
1212 			conf->state = H2MC_CONNECT;
1213 		} else {
1214 			info = malloc(sizeof(*info));
1215 			bzero(info, sizeof(*info));
1216 			info->fd = conf->fd;
1217 			info->detachme = 0;
1218 			conf->state = H2MC_RUNNING;
1219 			pthread_create(&conf->iocom_thread, NULL,
1220 				       dmsg_master_service, info);
1221 		}
1222 		break;
1223 	case H2MC_RUNNING:
1224 		break;
1225 	}
1226 }
1227 
1228 /************************************************************************
1229  *			ROUTER AND MESSAGING HANDLES			*
1230  ************************************************************************
1231  *
1232  * Basically the idea here is to provide a stable data structure which
1233  * can be localized to the caller for higher level protocols to work with.
1234  * Depends on the context, these dmsg_handle's can be pooled by use-case
1235  * and remain persistent through a client (or mount point's) life.
1236  */
1237 
1238 #if 0
1239 /*
1240  * Obtain a stable handle on a cluster given its uuid.  This ties directly
1241  * into the global cluster topology, creating the structure if necessary
1242  * (even if the uuid does not exist or does not exist yet), and preventing
1243  * the structure from getting ripped out from under us while we hold a
1244  * pointer to it.
1245  */
1246 h2span_cluster_t *
1247 dmsg_cluster_get(uuid_t *pfs_clid)
1248 {
1249 	h2span_cluster_t dummy_cls;
1250 	h2span_cluster_t *cls;
1251 
1252 	dummy_cls.pfs_clid = *pfs_clid;
1253 	pthread_mutex_lock(&cluster_mtx);
1254 	cls = RB_FIND(h2span_cluster_tree, &cluster_tree, &dummy_cls);
1255 	if (cls)
1256 		++cls->refs;
1257 	pthread_mutex_unlock(&cluster_mtx);
1258 	return (cls);
1259 }
1260 
1261 void
1262 dmsg_cluster_put(h2span_cluster_t *cls)
1263 {
1264 	pthread_mutex_lock(&cluster_mtx);
1265 	assert(cls->refs > 0);
1266 	--cls->refs;
1267 	if (RB_EMPTY(&cls->tree) && cls->refs == 0) {
1268 		RB_REMOVE(h2span_cluster_tree,
1269 			  &cluster_tree, cls);
1270 		dmsg_free(cls);
1271 	}
1272 	pthread_mutex_unlock(&cluster_mtx);
1273 }
1274 
1275 /*
1276  * Obtain a stable handle to a specific cluster node given its uuid.
1277  * This handle does NOT lock in the route to the node and is typically
1278  * used as part of the dmsg_handle_*() API to obtain a set of
1279  * stable nodes.
1280  */
1281 h2span_node_t *
1282 dmsg_node_get(h2span_cluster_t *cls, uuid_t *pfs_fsid)
1283 {
1284 }
1285 
1286 #endif
1287 
1288 #if 0
1289 /*
1290  * Acquire a persistent router structure given the cluster and node ids.
1291  * Messages can be transacted via this structure while held.  If the route
1292  * is lost messages will return failure.
1293  */
1294 dmsg_router_t *
1295 dmsg_router_get(uuid_t *pfs_clid, uuid_t *pfs_fsid)
1296 {
1297 }
1298 
1299 /*
1300  * Release previously acquired router.
1301  */
1302 void
1303 dmsg_router_put(dmsg_router_t *router)
1304 {
1305 }
1306 #endif
1307 
1308 /*
1309  * Dumps the spanning tree
1310  */
1311 void
1312 dmsg_shell_tree(dmsg_router_t *router, char *cmdbuf __unused)
1313 {
1314 	h2span_cluster_t *cls;
1315 	h2span_node_t *node;
1316 	h2span_link_t *slink;
1317 	char *uustr = NULL;
1318 
1319 	pthread_mutex_lock(&cluster_mtx);
1320 	RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
1321 		dmsg_router_printf(router, "Cluster %s (%s)\n",
1322 				   dmsg_uuid_to_str(&cls->pfs_clid, &uustr),
1323 				   cls->cl_label);
1324 		RB_FOREACH(node, h2span_node_tree, &cls->tree) {
1325 			dmsg_router_printf(router, "    Node %s (%s)\n",
1326 				dmsg_uuid_to_str(&node->pfs_fsid, &uustr),
1327 				node->fs_label);
1328 			RB_FOREACH(slink, h2span_link_tree, &node->tree) {
1329 				dmsg_router_printf(router,
1330 					    "\tLink dist=%d via %d\n",
1331 					    slink->dist,
1332 					    slink->state->iocom->sock_fd);
1333 			}
1334 		}
1335 	}
1336 	pthread_mutex_unlock(&cluster_mtx);
1337 	if (uustr)
1338 		free(uustr);
1339 #if 0
1340 	TAILQ_FOREACH(conn, &connq, entry) {
1341 	}
1342 #endif
1343 }
1344