xref: /dragonfly/sys/dev/disk/xdisk/xdisk.c (revision cab8bf9b)
1 /*
2  * Copyright (c) 2012-2014 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 /*
35  * This module allows disk devices to be created and associated with a
36  * communications pipe or socket.  You open the device and issue an
37  * ioctl() to install a new disk along with its communications descriptor.
38  *
39  * All further communication occurs via the descriptor using the DMSG
40  * LNK_CONN, LNK_SPAN, and BLOCK protocols.  The descriptor can be a
41  * direct connection to a remote machine's disk (in-kernenl), to a remote
42  * cluster controller, to the local cluster controller, etc.
43  *
44  * /dev/xdisk is the control device, issue ioctl()s to create the /dev/xa%d
45  * devices.  These devices look like raw disks to the system.
46  */
47 #include <sys/param.h>
48 #include <sys/systm.h>
49 #include <sys/buf.h>
50 #include <sys/conf.h>
51 #include <sys/device.h>
52 #include <sys/devicestat.h>
53 #include <sys/disk.h>
54 #include <sys/kernel.h>
55 #include <sys/malloc.h>
56 #include <sys/sysctl.h>
57 #include <sys/proc.h>
58 #include <sys/queue.h>
59 #include <sys/tree.h>
60 #include <sys/udev.h>
61 #include <sys/uuid.h>
62 #include <sys/kern_syscall.h>
63 
64 #include <sys/dmsg.h>
65 #include <sys/xdiskioctl.h>
66 
67 #include <sys/buf2.h>
68 #include <sys/thread2.h>
69 
70 struct xa_softc;
71 struct xa_softc_tree;
72 RB_HEAD(xa_softc_tree, xa_softc);
73 RB_PROTOTYPE(xa_softc_tree, xa_softc, rbnode, xa_softc_cmp);
74 
75 /*
76  * Track a BIO tag
77  */
78 struct xa_tag {
79 	TAILQ_ENTRY(xa_tag) entry;
80 	struct xa_softc	*sc;
81 	dmsg_blk_error_t status;
82 	kdmsg_state_t	*state;
83 	struct bio	*bio;
84 	int		waiting;
85 	int		async;
86 	int		done;
87 };
88 
89 typedef struct xa_tag	xa_tag_t;
90 
91 /*
92  * Track devices.
93  */
94 struct xa_softc {
95 	struct kdmsg_state_list spanq;
96 	RB_ENTRY(xa_softc) rbnode;
97 	cdev_t		dev;
98 	struct disk_info info;
99 	struct disk	disk;
100 	uuid_t		pfs_fsid;
101 	int		unit;
102 	int		opencnt;
103 	int		spancnt;
104 	uint64_t	keyid;
105 	int		serializing;
106 	int		last_error;
107 	char		cl_label[64];   /* from LNK_SPAN cl_label (host/dev) */
108 	char		fs_label[64];   /* from LNK_SPAN fs_label (serno str) */
109 	xa_tag_t	*open_tag;
110 	TAILQ_HEAD(, bio) bioq;		/* pending BIOs */
111 	TAILQ_HEAD(, xa_tag) tag_freeq;	/* available I/O tags */
112 	TAILQ_HEAD(, xa_tag) tag_pendq;	/* running I/O tags */
113 	struct lwkt_token tok;
114 };
115 
116 typedef struct xa_softc	xa_softc_t;
117 
118 struct xa_iocom {
119 	TAILQ_ENTRY(xa_iocom) entry;
120 	kdmsg_iocom_t	iocom;
121 	xa_softc_t	dummysc;
122 };
123 
124 typedef struct xa_iocom xa_iocom_t;
125 
126 static int xa_softc_cmp(xa_softc_t *sc1, xa_softc_t *sc2);
127 RB_GENERATE(xa_softc_tree, xa_softc, rbnode, xa_softc_cmp);
128 static struct xa_softc_tree xa_device_tree;
129 
130 #define MAXTAGS		64	/* no real limit */
131 
132 static int xdisk_attach(struct xdisk_attach_ioctl *xaioc);
133 static int xdisk_detach(struct xdisk_attach_ioctl *xaioc);
134 static void xaio_exit(kdmsg_iocom_t *iocom);
135 static int xaio_rcvdmsg(kdmsg_msg_t *msg);
136 
137 static void xa_terminate_check(struct xa_softc *sc);
138 
139 static xa_tag_t *xa_setup_cmd(xa_softc_t *sc, struct bio *bio);
140 static void xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async);
141 static void xa_done(xa_tag_t *tag, int wasbio);
142 static void xa_release(xa_tag_t *tag, int wasbio);
143 static uint32_t xa_wait(xa_tag_t *tag);
144 static int xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
145 static int xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
146 static void xa_restart_deferred(xa_softc_t *sc);
147 
148 MALLOC_DEFINE(M_XDISK, "Networked disk client", "Network Disks");
149 
150 /*
151  * Control device, issue ioctls to create xa devices.
152  */
153 static d_open_t xdisk_open;
154 static d_close_t xdisk_close;
155 static d_ioctl_t xdisk_ioctl;
156 
157 static struct dev_ops xdisk_ops = {
158 	{ "xdisk", 0, D_MPSAFE | D_TRACKCLOSE },
159         .d_open =	xdisk_open,
160         .d_close =	xdisk_close,
161         .d_ioctl =	xdisk_ioctl
162 };
163 
164 /*
165  * XA disk devices
166  */
167 static d_open_t xa_open;
168 static d_close_t xa_close;
169 static d_ioctl_t xa_ioctl;
170 static d_strategy_t xa_strategy;
171 static d_psize_t xa_size;
172 
173 static struct dev_ops xa_ops = {
174 	{ "xa", 0, D_DISK | D_CANFREE | D_MPSAFE | D_TRACKCLOSE },
175         .d_open =	xa_open,
176         .d_close =	xa_close,
177         .d_ioctl =	xa_ioctl,
178         .d_read =	physread,
179         .d_write =	physwrite,
180         .d_strategy =	xa_strategy,
181 	.d_psize =	xa_size
182 };
183 
184 static struct lwkt_token xdisk_token = LWKT_TOKEN_INITIALIZER(xdisk_token);
185 static int xdisk_opencount;
186 static cdev_t xdisk_dev;
187 static TAILQ_HEAD(, xa_iocom) xaiocomq;
188 
189 /*
190  * Module initialization
191  */
192 static int
193 xdisk_modevent(module_t mod, int type, void *data)
194 {
195 	switch (type) {
196 	case MOD_LOAD:
197 		TAILQ_INIT(&xaiocomq);
198 		RB_INIT(&xa_device_tree);
199 		xdisk_dev = make_dev(&xdisk_ops, 0,
200 				     UID_ROOT, GID_WHEEL, 0600, "xdisk");
201 		break;
202 	case MOD_UNLOAD:
203 	case MOD_SHUTDOWN:
204 		if (xdisk_opencount || TAILQ_FIRST(&xaiocomq))
205 			return (EBUSY);
206 		if (xdisk_dev) {
207 			destroy_dev(xdisk_dev);
208 			xdisk_dev = NULL;
209 		}
210 		dev_ops_remove_all(&xdisk_ops);
211 		dev_ops_remove_all(&xa_ops);
212 		break;
213 	default:
214 		break;
215 	}
216 	return 0;
217 }
218 
219 DEV_MODULE(xdisk, xdisk_modevent, 0);
220 
221 static int
222 xa_softc_cmp(xa_softc_t *sc1, xa_softc_t *sc2)
223 {
224 	return(strcmp(sc1->fs_label, sc2->fs_label));
225 }
226 
227 /*
228  * Control device
229  */
230 static int
231 xdisk_open(struct dev_open_args *ap)
232 {
233 	lwkt_gettoken(&xdisk_token);
234 	++xdisk_opencount;
235 	lwkt_reltoken(&xdisk_token);
236 	return(0);
237 }
238 
239 static int
240 xdisk_close(struct dev_close_args *ap)
241 {
242 	lwkt_gettoken(&xdisk_token);
243 	--xdisk_opencount;
244 	lwkt_reltoken(&xdisk_token);
245 	return(0);
246 }
247 
248 static int
249 xdisk_ioctl(struct dev_ioctl_args *ap)
250 {
251 	int error;
252 
253 	switch(ap->a_cmd) {
254 	case XDISKIOCATTACH:
255 		error = xdisk_attach((void *)ap->a_data);
256 		break;
257 	case XDISKIOCDETACH:
258 		error = xdisk_detach((void *)ap->a_data);
259 		break;
260 	default:
261 		error = ENOTTY;
262 		break;
263 	}
264 	return error;
265 }
266 
267 /************************************************************************
268  *				DMSG INTERFACE				*
269  ************************************************************************/
270 
271 static int
272 xdisk_attach(struct xdisk_attach_ioctl *xaioc)
273 {
274 	xa_iocom_t *xaio;
275 	struct file *fp;
276 
277 	/*
278 	 * Normalize ioctl params
279 	 */
280 	kprintf("xdisk_attach1\n");
281 	fp = holdfp(curproc->p_fd, xaioc->fd, -1);
282 	if (fp == NULL)
283 		return EINVAL;
284 	kprintf("xdisk_attach2\n");
285 
286 	/*
287 	 * See if the serial number is already present.  If we are
288 	 * racing a termination the disk subsystem may still have
289 	 * duplicate entries not yet removed so we wait a bit and
290 	 * retry.
291 	 */
292 	lwkt_gettoken(&xdisk_token);
293 
294 	xaio = kmalloc(sizeof(*xaio), M_XDISK, M_WAITOK | M_ZERO);
295 	kprintf("xdisk_attach3\n");
296 	kdmsg_iocom_init(&xaio->iocom, xaio,
297 			 KDMSG_IOCOMF_AUTOCONN,
298 			 M_XDISK, xaio_rcvdmsg);
299 	xaio->iocom.exit_func = xaio_exit;
300 
301 	kdmsg_iocom_reconnect(&xaio->iocom, fp, "xdisk");
302 
303 	/*
304 	 * Setup our LNK_CONN advertisement for autoinitiate.
305 	 *
306 	 * Our filter is setup to only accept PEER_BLOCK/SERVER
307 	 * advertisements.
308 	 *
309 	 * We need a unique pfs_fsid to avoid confusion.
310 	 */
311 	xaio->iocom.auto_lnk_conn.pfs_type = DMSG_PFSTYPE_CLIENT;
312 	xaio->iocom.auto_lnk_conn.proto_version = DMSG_SPAN_PROTO_1;
313 	xaio->iocom.auto_lnk_conn.peer_type = DMSG_PEER_BLOCK;
314 	xaio->iocom.auto_lnk_conn.peer_mask = 1LLU << DMSG_PEER_BLOCK;
315 	xaio->iocom.auto_lnk_conn.pfs_mask = 1LLU << DMSG_PFSTYPE_SERVER;
316 	ksnprintf(xaio->iocom.auto_lnk_conn.fs_label,
317 		  sizeof(xaio->iocom.auto_lnk_conn.fs_label),
318 		  "xdisk");
319 	kern_uuidgen(&xaio->iocom.auto_lnk_conn.pfs_fsid, 1);
320 
321 	/*
322 	 * Setup our LNK_SPAN advertisement for autoinitiate
323 	 */
324 	TAILQ_INSERT_TAIL(&xaiocomq, xaio, entry);
325 	kdmsg_iocom_autoinitiate(&xaio->iocom, NULL);
326 	lwkt_reltoken(&xdisk_token);
327 
328 	return 0;
329 }
330 
331 static int
332 xdisk_detach(struct xdisk_attach_ioctl *xaioc)
333 {
334 	return EINVAL;
335 }
336 
337 /*
338  * Called from iocom core transmit thread upon disconnect.
339  */
340 static
341 void
342 xaio_exit(kdmsg_iocom_t *iocom)
343 {
344 	xa_iocom_t *xaio = iocom->handle;
345 
346 	kprintf("xdisk_detach -xaio_exit\n");
347 	lwkt_gettoken(&xdisk_token);
348 	TAILQ_REMOVE(&xaiocomq, xaio, entry);
349 	lwkt_reltoken(&xdisk_token);
350 
351 	kfree(xaio, M_XDISK);
352 }
353 
354 /*
355  * Called from iocom core to handle messages that the iocom core does not
356  * handle itself and for which a state function callback has not yet been
357  * established.
358  *
359  * We primarily care about LNK_SPAN transactions here.
360  */
361 static int
362 xaio_rcvdmsg(kdmsg_msg_t *msg)
363 {
364 	kdmsg_state_t	*state = msg->state;
365 	xa_iocom_t	*xaio = state->iocom->handle;
366 	xa_softc_t	*sc;
367 
368 	kprintf("xdisk_rcvdmsg %08x\n", msg->any.head.cmd);
369 	lwkt_gettoken(&xdisk_token);
370 
371 	switch(msg->tcmd) {
372 	case DMSG_LNK_SPAN | DMSGF_CREATE | DMSGF_DELETE:
373 		/*
374 		 * A LNK_SPAN transaction which is opened and closed
375 		 * degenerately is not useful to us, just ignore it.
376 		 */
377 		kdmsg_msg_reply(msg, 0);
378 		break;
379 	case DMSG_LNK_SPAN | DMSGF_CREATE:
380 		/*
381 		 * Manage the tracking node for the remote LNK_SPAN.
382 		 *
383 		 * Return a streaming result, leaving the transaction open
384 		 * in both directions to allow sub-transactions.
385 		 */
386 		bcopy(msg->any.lnk_span.cl_label, xaio->dummysc.cl_label,
387 		      sizeof(xaio->dummysc.cl_label));
388 		xaio->dummysc.cl_label[sizeof(xaio->dummysc.cl_label) - 1] = 0;
389 
390 		bcopy(msg->any.lnk_span.fs_label, xaio->dummysc.fs_label,
391 		      sizeof(xaio->dummysc.fs_label));
392 		xaio->dummysc.fs_label[sizeof(xaio->dummysc.fs_label) - 1] = 0;
393 
394 		kprintf("xdisk: %s LNK_SPAN create ",
395 			msg->any.lnk_span.fs_label);
396 
397 		sc = RB_FIND(xa_softc_tree, &xa_device_tree, &xaio->dummysc);
398 		if (sc == NULL) {
399 			xa_softc_t *sctmp;
400 			xa_tag_t *tag;
401 			cdev_t dev;
402 			int unit;
403 			int n;
404 
405 			sc = kmalloc(sizeof(*sc), M_XDISK, M_WAITOK | M_ZERO);
406 			kprintf("(not found - create %p)\n", sc);
407 			bcopy(msg->any.lnk_span.cl_label, sc->cl_label,
408 			      sizeof(sc->cl_label));
409 			sc->cl_label[sizeof(sc->cl_label) - 1] = 0;
410 			bcopy(msg->any.lnk_span.fs_label, sc->fs_label,
411 			      sizeof(sc->fs_label));
412 			sc->fs_label[sizeof(sc->fs_label) - 1] = 0;
413 
414 			/* XXX FIXME O(N^2) */
415 			unit = -1;
416 			do {
417 				++unit;
418 				RB_FOREACH(sctmp, xa_softc_tree,
419 					   &xa_device_tree) {
420 					if (sctmp->unit == unit)
421 						break;
422 				}
423 			} while (sctmp);
424 
425 			sc->unit = unit;
426 			sc->serializing = 1;
427 			sc->spancnt = 1;
428 			lwkt_token_init(&sc->tok, "xa");
429 			TAILQ_INIT(&sc->spanq);
430 			TAILQ_INIT(&sc->bioq);
431 			TAILQ_INIT(&sc->tag_freeq);
432 			TAILQ_INIT(&sc->tag_pendq);
433 			RB_INSERT(xa_softc_tree, &xa_device_tree, sc);
434 			TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry);
435 			msg->state->any.xa_sc = sc;
436 
437 			/*
438 			 * Setup block device
439 			 */
440 			for (n = 0; n < MAXTAGS; ++n) {
441 				tag = kmalloc(sizeof(*tag),
442 					      M_XDISK, M_WAITOK|M_ZERO);
443 				tag->sc = sc;
444 				TAILQ_INSERT_TAIL(&sc->tag_freeq, tag, entry);
445 			}
446 
447 			if (sc->dev == NULL) {
448 				dev = disk_create(unit, &sc->disk, &xa_ops);
449 				dev->si_drv1 = sc;
450 				sc->dev = dev;
451 			}
452 
453 			sc->info.d_media_blksize =
454 				msg->any.lnk_span.media.block.blksize;
455 			if (sc->info.d_media_blksize <= 0)
456 				sc->info.d_media_blksize = 1;
457 			sc->info.d_media_blocks =
458 				msg->any.lnk_span.media.block.bytes /
459 				sc->info.d_media_blksize;
460 			sc->info.d_dsflags = DSO_MBRQUIET | DSO_RAWPSIZE;
461 			sc->info.d_secpertrack = 32;
462 			sc->info.d_nheads = 64;
463 			sc->info.d_secpercyl = sc->info.d_secpertrack *
464 					       sc->info.d_nheads;
465 			sc->info.d_ncylinders = 0;
466 			if (sc->fs_label[0])
467 				sc->info.d_serialno = sc->fs_label;
468 			disk_setdiskinfo_sync(&sc->disk, &sc->info);
469 			xa_restart_deferred(sc);	/* eats serializing */
470 		} else {
471 			kprintf("(found spancnt %d sc=%p)\n", sc->spancnt, sc);
472 			++sc->spancnt;
473 			TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry);
474 			msg->state->any.xa_sc = sc;
475 			if (sc->serializing == 0 && sc->open_tag == NULL) {
476 				sc->serializing = 1;
477 				xa_restart_deferred(sc); /* eats serializing */
478 			}
479 		}
480 		kdmsg_msg_result(msg, 0);
481 		break;
482 	case DMSG_LNK_SPAN | DMSGF_DELETE:
483 	case DMSG_LNK_SPAN | DMSGF_DELETE | DMSGF_REPLY:
484 		/*
485 		 * Manage the tracking node for the remote LNK_SPAN.
486 		 *
487 		 * Return a final result, closing our end of the transaction.
488 		 */
489 		sc = msg->state->any.xa_sc;
490 		kprintf("xdisk: %s LNK_SPAN terminate\n", sc->fs_label);
491 		msg->state->any.xa_sc = NULL;
492 		TAILQ_REMOVE(&sc->spanq, msg->state, user_entry);
493 		--sc->spancnt;
494 		xa_terminate_check(sc);
495 		kdmsg_msg_reply(msg, 0);
496 		break;
497 	case DMSG_LNK_SPAN | DMSGF_REPLY:
498 		/*
499 		 * Ignore unimplemented streaming replies on our LNK_SPAN
500 		 * transaction.
501 		 */
502 		break;
503 	case DMSG_DBG_SHELL:
504 		/*
505 		 * Execute shell command (not supported atm).
506 		 *
507 		 * This is a one-way packet but if not (e.g. if part of
508 		 * a streaming transaction), we will have already closed
509 		 * our end.
510 		 */
511 		kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
512 		break;
513 	case DMSG_DBG_SHELL | DMSGF_REPLY:
514 		/*
515 		 * Receive one or more replies to a shell command
516 		 * that we sent.  Just dump it to the console.
517 		 *
518 		 * This is a one-way packet but if not (e.g. if
519 		 * part of a streaming transaction), we will have
520 		 * already closed our end.
521 		 */
522 		if (msg->aux_data) {
523 			msg->aux_data[msg->aux_size - 1] = 0;
524 			kprintf("xdisk: DEBUGMSG: %s\n",
525 				msg->aux_data);
526 		}
527 		break;
528 	default:
529 		/*
530 		 * Unsupported one-way message, streaming message, or
531 		 * transaction.
532 		 *
533 		 * Terminate any unsupported transactions with an error
534 		 * and ignore any unsupported streaming messages.
535 		 *
536 		 * NOTE: This case also includes DMSG_LNK_ERROR messages
537 		 *	 which might be one-way, replying to those would
538 		 *	 cause an infinite ping-pong.
539 		 */
540 		if (msg->any.head.cmd & DMSGF_CREATE)
541 			kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
542 		break;
543 	}
544 	lwkt_reltoken(&xdisk_token);
545 
546 	return 0;
547 }
548 
549 /*
550  * Determine if we can destroy the xa_softc.
551  *
552  * Called with xdisk_token held.
553  */
554 static
555 void
556 xa_terminate_check(struct xa_softc *sc)
557 {
558 	xa_tag_t *tag;
559 
560 	/*
561 	 * Determine if we can destroy the softc.
562 	 */
563 	kprintf("xdisk: terminate check xa%d (%d,%d,%d) sc=%p ",
564 		sc->unit,
565 		sc->opencnt, sc->serializing, sc->spancnt,
566 		sc);
567 
568 	if (sc->opencnt || sc->serializing || sc->spancnt) {
569 		kprintf("(leave intact)\n");
570 		return;
571 	}
572 	kprintf("(remove from tree)\n");
573 	sc->serializing = 1;
574 	KKASSERT(TAILQ_EMPTY(&sc->tag_pendq));
575 
576 	RB_REMOVE(xa_softc_tree, &xa_device_tree, sc);
577 
578 	if (sc->dev) {
579 		disk_destroy(&sc->disk);
580 		sc->dev->si_drv1 = NULL;
581 		sc->dev = NULL;
582 	}
583 	KKASSERT(sc->opencnt == 0);
584 	KKASSERT(TAILQ_EMPTY(&sc->tag_pendq));
585 
586 	while ((tag = TAILQ_FIRST(&sc->tag_freeq)) != NULL) {
587 		TAILQ_REMOVE(&sc->tag_freeq, tag, entry);
588 		tag->sc = NULL;
589 		kfree(tag, M_XDISK);
590 	}
591 	kfree(sc, M_XDISK);
592 }
593 
594 /************************************************************************
595  *			   XA DEVICE INTERFACE				*
596  ************************************************************************/
597 
598 static int
599 xa_open(struct dev_open_args *ap)
600 {
601 	cdev_t dev = ap->a_head.a_dev;
602 	xa_softc_t *sc;
603 	int error;
604 
605 	dev->si_bsize_phys = 512;
606 	dev->si_bsize_best = 32768;
607 
608 	/*
609 	 * Interlock open with opencnt, wait for attachment operations
610 	 * to finish.
611 	 */
612 	lwkt_gettoken(&xdisk_token);
613 again:
614 	sc = dev->si_drv1;
615 	if (sc == NULL) {
616 		lwkt_reltoken(&xdisk_token);
617 		return ENXIO;	/* raced destruction */
618 	}
619 	if (sc->serializing) {
620 		tsleep(sc, 0, "xarace", hz / 10);
621 		goto again;
622 	}
623 	sc->serializing = 1;
624 
625 	/*
626 	 * Serialize initial open
627 	 */
628 	if (sc->opencnt++ > 0) {
629 		lwkt_reltoken(&xdisk_token);
630 		return(0);
631 	}
632 	lwkt_reltoken(&xdisk_token);
633 
634 	/*
635 	 * Issue BLK_OPEN if necessary.  ENXIO is returned if we have trouble.
636 	 */
637 	if (sc->open_tag == NULL) {
638 		xa_restart_deferred(sc); /* eats serializing */
639 	} else {
640 		sc->serializing = 0;
641 		wakeup(sc);
642 	}
643 
644 	/*
645 	 * Wait for completion of the BLK_OPEN
646 	 */
647 	lwkt_gettoken(&xdisk_token);
648 	while (sc->serializing)
649 		tsleep(sc, 0, "xaopen", hz);
650 
651 	error = sc->last_error;
652 	if (error) {
653 		KKASSERT(sc->opencnt > 0);
654 		--sc->opencnt;
655 		xa_terminate_check(sc);
656 		sc = NULL;	/* sc may be invalid now */
657 	}
658 	lwkt_reltoken(&xdisk_token);
659 
660 	return (error);
661 }
662 
663 static int
664 xa_close(struct dev_close_args *ap)
665 {
666 	cdev_t dev = ap->a_head.a_dev;
667 	xa_softc_t *sc;
668 	xa_tag_t *tag;
669 
670 	sc = dev->si_drv1;
671 	if (sc == NULL)
672 		return ENXIO;	/* raced destruction */
673 	lwkt_gettoken(&xdisk_token);
674 	lwkt_gettoken(&sc->tok);
675 
676 	/*
677 	 * NOTE: Clearing open_tag allows a concurrent open to re-open
678 	 *	 the device and prevents autonomous completion of the tag.
679 	 */
680 	if (sc->opencnt == 1 && sc->open_tag) {
681 		tag = sc->open_tag;
682 		sc->open_tag = NULL;
683 		kdmsg_state_reply(tag->state, 0);	/* close our side */
684 		xa_wait(tag);				/* wait on remote */
685 	}
686 	lwkt_reltoken(&sc->tok);
687 	KKASSERT(sc->opencnt > 0);
688 	--sc->opencnt;
689 	xa_terminate_check(sc);
690 	lwkt_reltoken(&xdisk_token);
691 
692 	return(0);
693 }
694 
695 static int
696 xa_strategy(struct dev_strategy_args *ap)
697 {
698 	xa_softc_t *sc = ap->a_head.a_dev->si_drv1;
699 	xa_tag_t *tag;
700 	struct bio *bio = ap->a_bio;
701 
702 	/*
703 	 * Allow potentially temporary link failures to fail the I/Os
704 	 * only if the device is not open.  That is, we allow the disk
705 	 * probe code prior to mount to fail.
706 	 */
707 	if (sc->opencnt == 0) {
708 		bio->bio_buf->b_error = ENXIO;
709 		bio->bio_buf->b_flags |= B_ERROR;
710 		biodone(bio);
711 		return(0);
712 	}
713 
714 	tag = xa_setup_cmd(sc, bio);
715 	if (tag)
716 		xa_start(tag, NULL, 1);
717 	return(0);
718 }
719 
720 static int
721 xa_ioctl(struct dev_ioctl_args *ap)
722 {
723 	return(ENOTTY);
724 }
725 
726 static int
727 xa_size(struct dev_psize_args *ap)
728 {
729 	struct xa_softc *sc;
730 
731 	if ((sc = ap->a_head.a_dev->si_drv1) == NULL)
732 		return (ENXIO);
733 	ap->a_result = sc->info.d_media_blocks;
734 	return (0);
735 }
736 
737 /************************************************************************
738  *		    XA BLOCK PROTOCOL STATE MACHINE			*
739  ************************************************************************
740  *
741  * Implement tag/msg setup and related functions.
742  */
743 static xa_tag_t *
744 xa_setup_cmd(xa_softc_t *sc, struct bio *bio)
745 {
746 	xa_tag_t *tag;
747 
748 	/*
749 	 * Only get a tag if we have a valid virtual circuit to the server.
750 	 */
751 	lwkt_gettoken(&sc->tok);
752 	if ((tag = TAILQ_FIRST(&sc->tag_freeq)) != NULL) {
753 		TAILQ_REMOVE(&sc->tag_freeq, tag, entry);
754 		tag->bio = bio;
755 		TAILQ_INSERT_TAIL(&sc->tag_pendq, tag, entry);
756 	}
757 
758 	/*
759 	 * If we can't dispatch now and this is a bio, queue it for later.
760 	 */
761 	if (tag == NULL && bio) {
762 		TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act);
763 	}
764 	lwkt_reltoken(&sc->tok);
765 
766 	return (tag);
767 }
768 
769 static void
770 xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async)
771 {
772 	xa_softc_t *sc = tag->sc;
773 
774 	tag->done = 0;
775 	tag->async = async;
776 
777 	if (msg == NULL) {
778 		struct bio *bio;
779 		struct buf *bp;
780 
781 		KKASSERT(tag->bio);
782 		bio = tag->bio;
783 		bp = bio->bio_buf;
784 
785 		switch(bp->b_cmd) {
786 		case BUF_CMD_READ:
787 			msg = kdmsg_msg_alloc(sc->open_tag->state,
788 					      DMSG_BLK_READ |
789 					      DMSGF_CREATE | DMSGF_DELETE,
790 					      xa_bio_completion, tag);
791 			msg->any.blk_read.keyid = sc->keyid;
792 			msg->any.blk_read.offset = bio->bio_offset;
793 			msg->any.blk_read.bytes = bp->b_bcount;
794 			break;
795 		case BUF_CMD_WRITE:
796 			msg = kdmsg_msg_alloc(sc->open_tag->state,
797 					      DMSG_BLK_WRITE |
798 					      DMSGF_CREATE | DMSGF_DELETE,
799 					      xa_bio_completion, tag);
800 			msg->any.blk_write.keyid = sc->keyid;
801 			msg->any.blk_write.offset = bio->bio_offset;
802 			msg->any.blk_write.bytes = bp->b_bcount;
803 			msg->aux_data = bp->b_data;
804 			msg->aux_size = bp->b_bcount;
805 			break;
806 		case BUF_CMD_FLUSH:
807 			msg = kdmsg_msg_alloc(sc->open_tag->state,
808 					      DMSG_BLK_FLUSH |
809 					      DMSGF_CREATE | DMSGF_DELETE,
810 					      xa_bio_completion, tag);
811 			msg->any.blk_flush.keyid = sc->keyid;
812 			msg->any.blk_flush.offset = bio->bio_offset;
813 			msg->any.blk_flush.bytes = bp->b_bcount;
814 			break;
815 		case BUF_CMD_FREEBLKS:
816 			msg = kdmsg_msg_alloc(sc->open_tag->state,
817 					      DMSG_BLK_FREEBLKS |
818 					      DMSGF_CREATE | DMSGF_DELETE,
819 					      xa_bio_completion, tag);
820 			msg->any.blk_freeblks.keyid = sc->keyid;
821 			msg->any.blk_freeblks.offset = bio->bio_offset;
822 			msg->any.blk_freeblks.bytes = bp->b_bcount;
823 			break;
824 		default:
825 			bp->b_flags |= B_ERROR;
826 			bp->b_error = EIO;
827 			biodone(bio);
828 			tag->bio = NULL;
829 			break;
830 		}
831 	}
832 
833 	if (msg) {
834 		tag->state = msg->state;
835 		kdmsg_msg_write(msg);
836 	} else {
837 		tag->status.head.error = DMSG_ERR_IO;
838 		xa_done(tag, 1);
839 	}
840 }
841 
842 static uint32_t
843 xa_wait(xa_tag_t *tag)
844 {
845 	xa_softc_t *sc = tag->sc;
846 	uint32_t error;
847 
848 	kprintf("xdisk: xa_wait  %p\n", tag);
849 
850 	lwkt_gettoken(&sc->tok);
851 	tag->waiting = 1;
852 	while (tag->done == 0)
853 		tsleep(tag, 0, "xawait", 0);
854 	lwkt_reltoken(&sc->tok);
855 	error = tag->status.head.error;
856 	tag->waiting = 0;
857 	xa_release(tag, 0);
858 
859 	return error;
860 }
861 
862 static void
863 xa_done(xa_tag_t *tag, int wasbio)
864 {
865 	KKASSERT(tag->bio == NULL);
866 
867 	tag->state = NULL;
868 	tag->done = 1;
869 	if (tag->waiting)
870 		wakeup(tag);
871 	if (tag->async)
872 		xa_release(tag, wasbio);
873 }
874 
875 static
876 void
877 xa_release(xa_tag_t *tag, int wasbio)
878 {
879 	xa_softc_t *sc = tag->sc;
880 	struct bio *bio;
881 
882 	lwkt_gettoken(&sc->tok);
883 	if (wasbio && (bio = TAILQ_FIRST(&sc->bioq)) != NULL) {
884 		TAILQ_REMOVE(&sc->bioq, bio, bio_act);
885 		tag->bio = bio;
886 		lwkt_reltoken(&sc->tok);
887 		xa_start(tag, NULL, 1);
888 	} else {
889 		TAILQ_REMOVE(&sc->tag_pendq, tag, entry);
890 		TAILQ_INSERT_TAIL(&sc->tag_freeq, tag, entry);
891 		lwkt_reltoken(&sc->tok);
892 	}
893 }
894 
895 /*
896  * Handle messages under the BLKOPEN transaction.
897  */
898 static int
899 xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
900 {
901 	xa_tag_t *tag = state->any.any;
902 	xa_softc_t *sc = tag->sc;
903 	struct bio *bio;
904 
905 	/*
906 	 * If the tag has been cleaned out we already closed our side
907 	 * of the transaction and we are waiting for the other side to
908 	 * close.
909 	 */
910 	if (tag == NULL) {
911 		if (msg->any.head.cmd & DMSGF_CREATE)
912 			kdmsg_state_reply(state, DMSG_ERR_LOSTLINK);
913 		return 0;
914 	}
915 
916 	/*
917 	 * Validate the tag
918 	 */
919 	lwkt_gettoken(&sc->tok);
920 
921 	/*
922 	 * Handle initial response to our open and restart any deferred
923 	 * BIOs on success.
924 	 *
925 	 * NOTE: DELETE may also be set.
926 	 */
927 	if (msg->any.head.cmd & DMSGF_CREATE) {
928 		switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
929 		case DMSG_LNK_ERROR | DMSGF_REPLY:
930 			bzero(&tag->status, sizeof(tag->status));
931 			tag->status.head = msg->any.head;
932 			break;
933 		case DMSG_BLK_ERROR | DMSGF_REPLY:
934 			tag->status = msg->any.blk_error;
935 			break;
936 		}
937 		sc->last_error = tag->status.head.error;
938 		kprintf("xdisk: blk_open completion status %d\n",
939 			sc->last_error);
940 		if (sc->last_error == 0) {
941 			while ((bio = TAILQ_FIRST(&sc->bioq)) != NULL) {
942 				tag = xa_setup_cmd(sc, NULL);
943 				if (tag == NULL)
944 					break;
945 				TAILQ_REMOVE(&sc->bioq, bio, bio_act);
946 				tag->bio = bio;
947 				xa_start(tag, NULL, 1);
948 			}
949 		}
950 		sc->serializing = 0;
951 		wakeup(sc);
952 	}
953 
954 	/*
955 	 * Handle unexpected termination (or lost comm channel) from other
956 	 * side.  Autonomous completion only if open_tag matches,
957 	 * otherwise another thread is probably waiting on the tag.
958 	 *
959 	 * (see xa_close() for other interactions)
960 	 */
961 	if (msg->any.head.cmd & DMSGF_DELETE) {
962 		kdmsg_state_reply(tag->state, 0);
963 		if (sc->open_tag == tag) {
964 			sc->open_tag = NULL;
965 			xa_done(tag, 0);
966 		} else {
967 			tag->async = 0;
968 			xa_done(tag, 0);
969 		}
970 	}
971 	lwkt_reltoken(&sc->tok);
972 	return (0);
973 }
974 
975 static int
976 xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
977 {
978 	xa_tag_t *tag = state->any.any;
979 	xa_softc_t *sc = tag->sc;
980 	struct bio *bio;
981 	struct buf *bp;
982 
983 	/*
984 	 * Get the bio from the tag.  If no bio is present we just do
985 	 * 'done' handling.
986 	 */
987 	if ((bio = tag->bio) == NULL)
988 		goto handle_done;
989 	bp = bio->bio_buf;
990 
991 	/*
992 	 * Process return status
993 	 */
994 	switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
995 	case DMSG_LNK_ERROR | DMSGF_REPLY:
996 		bzero(&tag->status, sizeof(tag->status));
997 		tag->status.head = msg->any.head;
998 		if (tag->status.head.error)
999 			tag->status.resid = bp->b_bcount;
1000 		else
1001 			tag->status.resid = 0;
1002 		break;
1003 	case DMSG_BLK_ERROR | DMSGF_REPLY:
1004 		tag->status = msg->any.blk_error;
1005 		break;
1006 	}
1007 
1008 	/*
1009 	 * If the device is open stall the bio on DMSG errors.  If an
1010 	 * actual I/O error occured on the remote device, DMSG_ERR_IO
1011 	 * will be returned.
1012 	 */
1013 	if (tag->status.head.error &&
1014 	    (msg->any.head.cmd & DMSGF_DELETE) && sc->opencnt) {
1015 		if (tag->status.head.error != DMSG_ERR_IO)
1016 			goto handle_repend;
1017 	}
1018 
1019 	/*
1020 	 * Process bio completion
1021 	 *
1022 	 * For reads any returned data is zero-extended if necessary, so
1023 	 * the server can short-cut any all-zeros reads if it desires.
1024 	 */
1025 	switch(bp->b_cmd) {
1026 	case BUF_CMD_READ:
1027 		if (msg->aux_data && msg->aux_size) {
1028 			if (msg->aux_size < bp->b_bcount) {
1029 				bcopy(msg->aux_data, bp->b_data, msg->aux_size);
1030 				bzero(bp->b_data + msg->aux_size,
1031 				      bp->b_bcount - msg->aux_size);
1032 			} else {
1033 				bcopy(msg->aux_data, bp->b_data, bp->b_bcount);
1034 			}
1035 		} else {
1036 			bzero(bp->b_data, bp->b_bcount);
1037 		}
1038 		/* fall through */
1039 	case BUF_CMD_WRITE:
1040 	case BUF_CMD_FLUSH:
1041 	case BUF_CMD_FREEBLKS:
1042 	default:
1043 		if (tag->status.resid > bp->b_bcount)
1044 			tag->status.resid = bp->b_bcount;
1045 		bp->b_resid = tag->status.resid;
1046 		if (tag->status.head.error != 0) {
1047 			bp->b_error = EIO;
1048 			bp->b_flags |= B_ERROR;
1049 		} else {
1050 			bp->b_resid = 0;
1051 		}
1052 		biodone(bio);
1053 		tag->bio = NULL;
1054 		break;
1055 	}
1056 
1057 	/*
1058 	 * Handle completion of the transaction.  If the bioq is not empty
1059 	 * we can initiate another bio on the same tag.
1060 	 *
1061 	 * NOTE: Most of our transactions will be single-message
1062 	 *	 CREATE+DELETEs, so we won't have to terminate the
1063 	 *	 transaction separately, here.  But just in case they
1064 	 *	 aren't be sure to terminate the transaction.
1065 	 */
1066 handle_done:
1067 	if (msg->any.head.cmd & DMSGF_DELETE) {
1068 		xa_done(tag, 1);
1069 		if ((state->txcmd & DMSGF_DELETE) == 0)
1070 			kdmsg_msg_reply(msg, 0);
1071 	}
1072 	return (0);
1073 
1074 	/*
1075 	 * Handle the case where the transaction failed due to a
1076 	 * connectivity issue.  The tag is put away with wasbio=0
1077 	 * and we put the BIO back onto the bioq for a later restart.
1078 	 */
1079 handle_repend:
1080 	lwkt_gettoken(&sc->tok);
1081 	kprintf("BIO CIRC FAILURE, REPEND BIO %p\n", bio);
1082 	tag->bio = NULL;
1083 	xa_done(tag, 0);
1084 	if ((state->txcmd & DMSGF_DELETE) == 0)
1085 		kdmsg_msg_reply(msg, 0);
1086 
1087 	/*
1088 	 * Requeue the bio
1089 	 */
1090 	TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act);
1091 
1092 	lwkt_reltoken(&sc->tok);
1093 	return (0);
1094 }
1095 
1096 /*
1097  * Restart as much deferred I/O as we can.  The serializer is set and we
1098  * eat it (clear it) when done.
1099  *
1100  * Called with sc->tok held
1101  */
1102 static
1103 void
1104 xa_restart_deferred(xa_softc_t *sc)
1105 {
1106 	kdmsg_state_t *span;
1107 	kdmsg_msg_t *msg;
1108 	xa_tag_t *tag;
1109 	int error;
1110 
1111 	KKASSERT(sc->serializing);
1112 
1113 	/*
1114 	 * Determine if a restart is needed.
1115 	 */
1116 	if (sc->opencnt == 0) {
1117 		/*
1118 		 * Device is not open, nothing to do, eat serializing.
1119 		 */
1120 		sc->serializing = 0;
1121 		wakeup(sc);
1122 	} else if (sc->open_tag == NULL) {
1123 		/*
1124 		 * BLK_OPEN required before we can restart any BIOs.
1125 		 * Select the best LNK_SPAN to issue the BLK_OPEN under.
1126 		 *
1127 		 * serializing interlocks waiting open()s.
1128 		 */
1129 		error = 0;
1130 		TAILQ_FOREACH(span, &sc->spanq, user_entry) {
1131 			if ((span->rxcmd & DMSGF_DELETE) == 0)
1132 				break;
1133 		}
1134 		if (span == NULL)
1135 			error = ENXIO;
1136 
1137 		if (error == 0) {
1138 			tag = xa_setup_cmd(sc, NULL);
1139 			if (tag == NULL)
1140 				error = ENXIO;
1141 		}
1142 		if (error == 0) {
1143 			kprintf("xdisk: BLK_OPEN\n");
1144 			sc->open_tag = tag;
1145 			msg = kdmsg_msg_alloc(span,
1146 					      DMSG_BLK_OPEN |
1147 					      DMSGF_CREATE,
1148 					      xa_sync_completion, tag);
1149 			msg->any.blk_open.modes = DMSG_BLKOPEN_RD;
1150 			xa_start(tag, msg, 0);
1151 		}
1152 		if (error) {
1153 			sc->serializing = 0;
1154 			wakeup(sc);
1155 		}
1156 		/* else leave serializing set until BLK_OPEN response */
1157 	} else {
1158 		/* nothing to do */
1159 		sc->serializing = 0;
1160 		wakeup(sc);
1161 	}
1162 }
1163