xref: /dragonfly/sys/dev/disk/xdisk/xdisk.c (revision 7bcb6caf)
1 /*
2  * Copyright (c) 2012-2014 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 /*
35  * This module allows disk devices to be created and associated with a
36  * communications pipe or socket.  You open the device and issue an
37  * ioctl() to install a new disk along with its communications descriptor.
38  *
39  * All further communication occurs via the descriptor using the DMSG
40  * LNK_CONN, LNK_SPAN, and BLOCK protocols.  The descriptor can be a
41  * direct connection to a remote machine's disk (in-kernenl), to a remote
42  * cluster controller, to the local cluster controller, etc.
43  *
44  * /dev/xdisk is the control device, issue ioctl()s to create the /dev/xa%d
45  * devices.  These devices look like raw disks to the system.
46  */
47 #include <sys/param.h>
48 #include <sys/systm.h>
49 #include <sys/buf.h>
50 #include <sys/conf.h>
51 #include <sys/device.h>
52 #include <sys/devicestat.h>
53 #include <sys/disk.h>
54 #include <sys/kernel.h>
55 #include <sys/malloc.h>
56 #include <sys/sysctl.h>
57 #include <sys/proc.h>
58 #include <sys/queue.h>
59 #include <sys/tree.h>
60 #include <sys/udev.h>
61 #include <sys/uuid.h>
62 #include <sys/kern_syscall.h>
63 
64 #include <sys/dmsg.h>
65 #include <sys/xdiskioctl.h>
66 
67 #include <sys/buf2.h>
68 #include <sys/thread2.h>
69 
70 struct xa_softc;
71 struct xa_softc_tree;
72 RB_HEAD(xa_softc_tree, xa_softc);
73 RB_PROTOTYPE(xa_softc_tree, xa_softc, rbnode, xa_softc_cmp);
74 
75 static int xa_active;
76 SYSCTL_INT(_debug, OID_AUTO, xa_active, CTLFLAG_RW, &xa_active, 0,
77 	   "Number of active xdisk IOs");
78 static uint64_t xa_last;
79 SYSCTL_ULONG(_debug, OID_AUTO, xa_last, CTLFLAG_RW, &xa_last, 0,
80 	   "Offset of last xdisk IO");
81 static int xa_debug = 1;
82 SYSCTL_INT(_debug, OID_AUTO, xa_debug, CTLFLAG_RW, &xa_debug, 0,
83 	   "xdisk debugging");
84 
85 /*
86  * Track a BIO tag
87  */
88 struct xa_tag {
89 	TAILQ_ENTRY(xa_tag) entry;
90 	struct xa_softc	*sc;
91 	dmsg_blk_error_t status;
92 	kdmsg_state_t	*state;
93 	struct bio	*bio;
94 	int		waiting;
95 	int		async;
96 	int		done;
97 };
98 
99 typedef struct xa_tag	xa_tag_t;
100 
101 /*
102  * Track devices.
103  */
104 struct xa_softc {
105 	struct kdmsg_state_list spanq;
106 	RB_ENTRY(xa_softc) rbnode;
107 	cdev_t		dev;
108 	struct devstat	stats;
109 	struct disk_info info;
110 	struct disk	disk;
111 	uuid_t		peer_id;
112 	int		unit;
113 	int		opencnt;
114 	int		spancnt;
115 	uint64_t	keyid;
116 	int		serializing;
117 	int		last_error;
118 	int		terminating;
119 	char		peer_label[64];	/* from LNK_SPAN host/dev */
120 	char		pfs_label[64];	/* from LNK_SPAN serno */
121 	xa_tag_t	*open_tag;
122 	TAILQ_HEAD(, bio) bioq;		/* pending BIOs */
123 	TAILQ_HEAD(, xa_tag) tag_freeq;	/* available I/O tags */
124 	TAILQ_HEAD(, xa_tag) tag_pendq;	/* running I/O tags */
125 	struct lock	lk;
126 };
127 
128 typedef struct xa_softc	xa_softc_t;
129 
130 struct xa_iocom {
131 	TAILQ_ENTRY(xa_iocom) entry;
132 	kdmsg_iocom_t	iocom;
133 	xa_softc_t	dummysc;
134 };
135 
136 typedef struct xa_iocom xa_iocom_t;
137 
138 static int xa_softc_cmp(xa_softc_t *sc1, xa_softc_t *sc2);
139 RB_GENERATE(xa_softc_tree, xa_softc, rbnode, xa_softc_cmp);
140 static struct xa_softc_tree xa_device_tree;
141 
142 #define MAXTAGS		64	/* no real limit */
143 
144 static int xdisk_attach(struct xdisk_attach_ioctl *xaioc);
145 static int xdisk_detach(struct xdisk_attach_ioctl *xaioc);
146 static void xaio_exit(kdmsg_iocom_t *iocom);
147 static int xaio_rcvdmsg(kdmsg_msg_t *msg);
148 
149 static void xa_terminate_check(struct xa_softc *sc);
150 
151 static xa_tag_t *xa_setup_cmd(xa_softc_t *sc, struct bio *bio);
152 static void xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async);
153 static void xa_done(xa_tag_t *tag, int wasbio);
154 static void xa_release(xa_tag_t *tag, int wasbio);
155 static uint32_t xa_wait(xa_tag_t *tag);
156 static int xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
157 static int xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
158 static void xa_restart_deferred(xa_softc_t *sc);
159 
160 #define xa_printf(level, ctl, ...)	\
161 	if (xa_debug >= (level)) kprintf("xdisk: " ctl, __VA_ARGS__)
162 
163 MALLOC_DEFINE(M_XDISK, "Networked disk client", "Network Disks");
164 
165 /*
166  * Control device, issue ioctls to create xa devices.
167  */
168 static d_open_t xdisk_open;
169 static d_close_t xdisk_close;
170 static d_ioctl_t xdisk_ioctl;
171 
172 static struct dev_ops xdisk_ops = {
173 	{ "xdisk", 0, D_MPSAFE | D_TRACKCLOSE },
174         .d_open =	xdisk_open,
175         .d_close =	xdisk_close,
176         .d_ioctl =	xdisk_ioctl
177 };
178 
179 /*
180  * XA disk devices
181  */
182 static d_open_t xa_open;
183 static d_close_t xa_close;
184 static d_ioctl_t xa_ioctl;
185 static d_strategy_t xa_strategy;
186 static d_psize_t xa_size;
187 
188 static struct dev_ops xa_ops = {
189 	{ "xa", 0, D_DISK | D_CANFREE | D_MPSAFE | D_TRACKCLOSE },
190         .d_open =	xa_open,
191         .d_close =	xa_close,
192         .d_ioctl =	xa_ioctl,
193         .d_read =	physread,
194         .d_write =	physwrite,
195         .d_strategy =	xa_strategy,
196 	.d_psize =	xa_size
197 };
198 
199 static int xdisk_opencount;
200 static cdev_t xdisk_dev;
201 struct lock xdisk_lk;
202 static TAILQ_HEAD(, xa_iocom) xaiocomq;
203 
204 /*
205  * Module initialization
206  */
207 static int
208 xdisk_modevent(module_t mod, int type, void *data)
209 {
210 	switch (type) {
211 	case MOD_LOAD:
212 		TAILQ_INIT(&xaiocomq);
213 		RB_INIT(&xa_device_tree);
214 		lockinit(&xdisk_lk, "xdisk", 0, 0);
215 		xdisk_dev = make_dev(&xdisk_ops, 0,
216 				     UID_ROOT, GID_WHEEL, 0600, "xdisk");
217 		break;
218 	case MOD_UNLOAD:
219 	case MOD_SHUTDOWN:
220 		if (!RB_EMPTY(&xa_device_tree))
221 			return (EBUSY);
222 		if (xdisk_opencount || TAILQ_FIRST(&xaiocomq))
223 			return (EBUSY);
224 		if (xdisk_dev) {
225 			destroy_dev(xdisk_dev);
226 			xdisk_dev = NULL;
227 		}
228 		dev_ops_remove_all(&xdisk_ops);
229 		dev_ops_remove_all(&xa_ops);
230 		break;
231 	default:
232 		break;
233 	}
234 	return 0;
235 }
236 
237 DEV_MODULE(xdisk, xdisk_modevent, 0);
238 
239 static int
240 xa_softc_cmp(xa_softc_t *sc1, xa_softc_t *sc2)
241 {
242 	return(strcmp(sc1->pfs_label, sc2->pfs_label));
243 }
244 
245 /*
246  * Control device
247  */
248 static int
249 xdisk_open(struct dev_open_args *ap)
250 {
251 	lockmgr(&xdisk_lk, LK_EXCLUSIVE);
252 	++xdisk_opencount;
253 	lockmgr(&xdisk_lk, LK_RELEASE);
254 	return(0);
255 }
256 
257 static int
258 xdisk_close(struct dev_close_args *ap)
259 {
260 	lockmgr(&xdisk_lk, LK_EXCLUSIVE);
261 	--xdisk_opencount;
262 	lockmgr(&xdisk_lk, LK_RELEASE);
263 	return(0);
264 }
265 
266 static int
267 xdisk_ioctl(struct dev_ioctl_args *ap)
268 {
269 	int error;
270 
271 	switch(ap->a_cmd) {
272 	case XDISKIOCATTACH:
273 		error = xdisk_attach((void *)ap->a_data);
274 		break;
275 	case XDISKIOCDETACH:
276 		error = xdisk_detach((void *)ap->a_data);
277 		break;
278 	default:
279 		error = ENOTTY;
280 		break;
281 	}
282 	return error;
283 }
284 
285 /************************************************************************
286  *				DMSG INTERFACE				*
287  ************************************************************************/
288 
289 static int
290 xdisk_attach(struct xdisk_attach_ioctl *xaioc)
291 {
292 	xa_iocom_t *xaio;
293 	struct file *fp;
294 
295 	/*
296 	 * Normalize ioctl params
297 	 */
298 	fp = holdfp(curthread, xaioc->fd, -1);
299 	if (fp == NULL)
300 		return EINVAL;
301 	xa_printf(1, "xdisk_attach fp=%p\n", fp);
302 
303 	/*
304 	 * See if the serial number is already present.  If we are
305 	 * racing a termination the disk subsystem may still have
306 	 * duplicate entries not yet removed so we wait a bit and
307 	 * retry.
308 	 */
309 	lockmgr(&xdisk_lk, LK_EXCLUSIVE);
310 
311 	xaio = kmalloc(sizeof(*xaio), M_XDISK, M_WAITOK | M_ZERO);
312 	kdmsg_iocom_init(&xaio->iocom, xaio,
313 			 KDMSG_IOCOMF_AUTOCONN,
314 			 M_XDISK, xaio_rcvdmsg);
315 	xaio->iocom.exit_func = xaio_exit;
316 
317 	kdmsg_iocom_reconnect(&xaio->iocom, fp, "xdisk");
318 
319 	/*
320 	 * Setup our LNK_CONN advertisement for autoinitiate.
321 	 *
322 	 * Our filter is setup to only accept PEER_BLOCK advertisements.
323 	 * XXX no peer_id filter.
324 	 *
325 	 * We need a unique pfs_fsid to avoid confusion.
326 	 */
327 	xaio->iocom.auto_lnk_conn.peer_type = DMSG_PEER_CLIENT;
328 	xaio->iocom.auto_lnk_conn.proto_version = DMSG_SPAN_PROTO_1;
329 	xaio->iocom.auto_lnk_conn.peer_mask = 1LLU << DMSG_PEER_BLOCK;
330 	ksnprintf(xaio->iocom.auto_lnk_conn.peer_label,
331 		  sizeof(xaio->iocom.auto_lnk_conn.peer_label),
332 		  "%s/xdisk",
333 		  hostname);
334 	/* kern_uuidgen(&xaio->iocom.auto_lnk_conn.pfs_fsid, 1); */
335 
336 	/*
337 	 * Setup our LNK_SPAN advertisement for autoinitiate
338 	 */
339 	TAILQ_INSERT_TAIL(&xaiocomq, xaio, entry);
340 	kdmsg_iocom_autoinitiate(&xaio->iocom, NULL);
341 
342 	lockmgr(&xdisk_lk, LK_RELEASE);
343 
344 	return 0;
345 }
346 
347 static int
348 xdisk_detach(struct xdisk_attach_ioctl *xaioc)
349 {
350 	return EINVAL;
351 }
352 
353 /*
354  * Called from iocom core transmit thread upon disconnect.
355  */
356 static
357 void
358 xaio_exit(kdmsg_iocom_t *iocom)
359 {
360 	xa_iocom_t *xaio = iocom->handle;
361 
362 	lockmgr(&xdisk_lk, LK_EXCLUSIVE);
363 	xa_printf(1, "%s", "xdisk_detach [xaio_exit()]\n");
364 	TAILQ_REMOVE(&xaiocomq, xaio, entry);
365 	lockmgr(&xdisk_lk, LK_RELEASE);
366 
367 	kdmsg_iocom_uninit(&xaio->iocom);
368 
369 	kfree(xaio, M_XDISK);
370 }
371 
372 /*
373  * Called from iocom core to handle messages that the iocom core does not
374  * handle itself and for which a state function callback has not yet been
375  * established.
376  *
377  * We primarily care about LNK_SPAN transactions here.
378  */
379 static int
380 xaio_rcvdmsg(kdmsg_msg_t *msg)
381 {
382 	kdmsg_state_t	*state = msg->state;
383 	xa_iocom_t	*xaio = state->iocom->handle;
384 	xa_softc_t	*sc;
385 
386 	if (state) {
387 		xa_printf(4,
388 			"xdisk - rcvmsg state=%p rx=%08x tx=%08x msgcmd=%08x\n",
389 			state, state->rxcmd, state->txcmd,
390 			msg->any.head.cmd);
391 	}
392 	lockmgr(&xdisk_lk, LK_EXCLUSIVE);
393 
394 	switch(msg->tcmd) {
395 	case DMSG_LNK_SPAN | DMSGF_CREATE | DMSGF_DELETE:
396 		/*
397 		 * A LNK_SPAN transaction which is opened and closed
398 		 * degenerately is not useful to us, just ignore it.
399 		 */
400 		kdmsg_msg_reply(msg, 0);
401 		break;
402 	case DMSG_LNK_SPAN | DMSGF_CREATE:
403 		/*
404 		 * Manage the tracking node for the remote LNK_SPAN.
405 		 *
406 		 * Return a streaming result, leaving the transaction open
407 		 * in both directions to allow sub-transactions.
408 		 */
409 		bcopy(msg->any.lnk_span.peer_label, xaio->dummysc.peer_label,
410 		      sizeof(xaio->dummysc.peer_label));
411 		xaio->dummysc.peer_label[
412 			sizeof(xaio->dummysc.peer_label) - 1] = 0;
413 
414 		bcopy(msg->any.lnk_span.pfs_label, xaio->dummysc.pfs_label,
415 		      sizeof(xaio->dummysc.pfs_label));
416 		xaio->dummysc.pfs_label[
417 			sizeof(xaio->dummysc.pfs_label) - 1] = 0;
418 
419 		xa_printf(3, "LINK_SPAN state %p create for %s\n",
420 			  msg->state, msg->any.lnk_span.pfs_label);
421 
422 		sc = RB_FIND(xa_softc_tree, &xa_device_tree, &xaio->dummysc);
423 		if (sc == NULL) {
424 			xa_softc_t *sctmp;
425 			xa_tag_t *tag;
426 			cdev_t dev;
427 			int unit;
428 			int n;
429 
430 			sc = kmalloc(sizeof(*sc), M_XDISK, M_WAITOK | M_ZERO);
431 			bcopy(msg->any.lnk_span.peer_label, sc->peer_label,
432 			      sizeof(sc->peer_label));
433 			sc->peer_label[sizeof(sc->peer_label) - 1] = 0;
434 			bcopy(msg->any.lnk_span.pfs_label, sc->pfs_label,
435 			      sizeof(sc->pfs_label));
436 			sc->pfs_label[sizeof(sc->pfs_label) - 1] = 0;
437 
438 			/* XXX FIXME O(N^2) */
439 			unit = -1;
440 			do {
441 				++unit;
442 				RB_FOREACH(sctmp, xa_softc_tree,
443 					   &xa_device_tree) {
444 					if (sctmp->unit == unit)
445 						break;
446 				}
447 			} while (sctmp);
448 
449 			sc->unit = unit;
450 			sc->serializing = 1;
451 			sc->spancnt = 1;
452 			lockinit(&sc->lk, "xalk", 0, 0);
453 			TAILQ_INIT(&sc->spanq);
454 			TAILQ_INIT(&sc->bioq);
455 			TAILQ_INIT(&sc->tag_freeq);
456 			TAILQ_INIT(&sc->tag_pendq);
457 
458 			lockmgr(&sc->lk, LK_EXCLUSIVE);
459 			RB_INSERT(xa_softc_tree, &xa_device_tree, sc);
460 			TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry);
461 			msg->state->any.xa_sc = sc;
462 
463 			/*
464 			 * Setup block device
465 			 */
466 			for (n = 0; n < MAXTAGS; ++n) {
467 				tag = kmalloc(sizeof(*tag),
468 					      M_XDISK, M_WAITOK|M_ZERO);
469 				tag->sc = sc;
470 				TAILQ_INSERT_TAIL(&sc->tag_freeq, tag, entry);
471 			}
472 
473 			if (sc->dev == NULL) {
474 				dev = disk_create(unit, &sc->disk, &xa_ops);
475 				dev->si_drv1 = sc;
476 				sc->dev = dev;
477 				devstat_add_entry(&sc->stats, "xa", unit,
478 						  DEV_BSIZE,
479 						  DEVSTAT_NO_ORDERED_TAGS,
480 						  DEVSTAT_TYPE_DIRECT |
481 						  DEVSTAT_TYPE_IF_OTHER,
482 						  DEVSTAT_PRIORITY_OTHER);
483 			}
484 
485 			sc->info.d_media_blksize =
486 				msg->any.lnk_span.media.block.blksize;
487 			if (sc->info.d_media_blksize <= 0)
488 				sc->info.d_media_blksize = 1;
489 			sc->info.d_media_blocks =
490 				msg->any.lnk_span.media.block.bytes /
491 				sc->info.d_media_blksize;
492 			sc->info.d_dsflags = DSO_MBRQUIET | DSO_RAWPSIZE;
493 			sc->info.d_secpertrack = 32;
494 			sc->info.d_nheads = 64;
495 			sc->info.d_secpercyl = sc->info.d_secpertrack *
496 					       sc->info.d_nheads;
497 			sc->info.d_ncylinders = 0;
498 			if (sc->pfs_label[0])
499 				sc->info.d_serialno = sc->pfs_label;
500 			/*
501 			 * WARNING! disk_setdiskinfo() must be asynchronous
502 			 *	    because we are in the rxmsg thread.  If
503 			 *	    it is synchronous and issues more disk
504 			 *	    I/Os, we will deadlock.
505 			 */
506 			disk_setdiskinfo(&sc->disk, &sc->info);
507 			xa_restart_deferred(sc);	/* eats serializing */
508 			lockmgr(&sc->lk, LK_RELEASE);
509 		} else {
510 			lockmgr(&sc->lk, LK_EXCLUSIVE);
511 			++sc->spancnt;
512 			TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry);
513 			msg->state->any.xa_sc = sc;
514 			if (sc->serializing == 0 && sc->open_tag == NULL) {
515 				sc->serializing = 1;
516 				xa_restart_deferred(sc); /* eats serializing */
517 			}
518 			lockmgr(&sc->lk, LK_RELEASE);
519 			if (sc->dev && sc->dev->si_disk) {
520 				xa_printf(1, "reprobe disk: %s\n",
521 					  sc->pfs_label);
522 				disk_msg_send(DISK_DISK_REPROBE,
523 					      sc->dev->si_disk,
524 					      NULL);
525 			}
526 		}
527 		xa_printf(2, "sc %p spancnt %d\n", sc, sc->spancnt);
528 		kdmsg_msg_result(msg, 0);
529 		break;
530 	case DMSG_LNK_SPAN | DMSGF_DELETE:
531 		/*
532 		 * Manage the tracking node for the remote LNK_SPAN.
533 		 *
534 		 * Return a final result, closing our end of the transaction.
535 		 */
536 		sc = msg->state->any.xa_sc;
537 		xa_printf(3, "LINK_SPAN state %p delete for %s (sc=%p)\n",
538 			  msg->state, (sc ? sc->pfs_label : "(null)"), sc);
539 		lockmgr(&sc->lk, LK_EXCLUSIVE);
540 		msg->state->any.xa_sc = NULL;
541 		TAILQ_REMOVE(&sc->spanq, msg->state, user_entry);
542 		--sc->spancnt;
543 
544 		xa_printf(2, "sc %p spancnt %d\n", sc, sc->spancnt);
545 
546 		/*
547 		 * Spans can come and go as the graph stabilizes, so if
548 		 * we lose a span along with sc->open_tag we may be able
549 		 * to restart the I/Os on a different span.
550 		 */
551 		if (sc->spancnt &&
552 		    sc->serializing == 0 && sc->open_tag == NULL) {
553 			sc->serializing = 1;
554 			xa_restart_deferred(sc);
555 		}
556 		lockmgr(&sc->lk, LK_RELEASE);
557 		kdmsg_msg_reply(msg, 0);
558 
559 #if 0
560 		/*
561 		 * Termination
562 		 */
563 		if (sc->spancnt == 0)
564 			xa_terminate_check(sc);
565 #endif
566 		break;
567 	case DMSG_LNK_SPAN | DMSGF_DELETE | DMSGF_REPLY:
568 		/*
569 		 * Ignore unimplemented streaming replies on our LNK_SPAN
570 		 * transaction.
571 		 */
572 		xa_printf(3, "LINK_SPAN state %p delete+reply\n",
573 			  msg->state);
574 		break;
575 	case DMSG_LNK_SPAN | DMSGF_REPLY:
576 		/*
577 		 * Ignore unimplemented streaming replies on our LNK_SPAN
578 		 * transaction.
579 		 */
580 		xa_printf(3, "LINK_SPAN state %p reply\n",
581 			  msg->state);
582 		break;
583 	case DMSG_DBG_SHELL:
584 		/*
585 		 * Execute shell command (not supported atm).
586 		 *
587 		 * This is a one-way packet but if not (e.g. if part of
588 		 * a streaming transaction), we will have already closed
589 		 * our end.
590 		 */
591 		kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
592 		break;
593 	case DMSG_DBG_SHELL | DMSGF_REPLY:
594 		/*
595 		 * Receive one or more replies to a shell command
596 		 * that we sent.  Just dump it to the console.
597 		 *
598 		 * This is a one-way packet but if not (e.g. if
599 		 * part of a streaming transaction), we will have
600 		 * already closed our end.
601 		 */
602 		if (msg->aux_data) {
603 			msg->aux_data[msg->aux_size - 1] = 0;
604 			xa_printf(0, "DEBUGMSG: %s\n", msg->aux_data);
605 		}
606 		break;
607 	default:
608 		/*
609 		 * Unsupported one-way message, streaming message, or
610 		 * transaction.
611 		 *
612 		 * Terminate any unsupported transactions with an error
613 		 * and ignore any unsupported streaming messages.
614 		 *
615 		 * NOTE: This case also includes DMSG_LNK_ERROR messages
616 		 *	 which might be one-way, replying to those would
617 		 *	 cause an infinite ping-pong.
618 		 */
619 		if (msg->any.head.cmd & DMSGF_CREATE)
620 			kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
621 		break;
622 	}
623 	lockmgr(&xdisk_lk, LK_RELEASE);
624 
625 	return 0;
626 }
627 
628 /*
629  * Determine if we can destroy the xa_softc.
630  *
631  * Called with xdisk_lk held.
632  */
633 static
634 void
635 xa_terminate_check(struct xa_softc *sc)
636 {
637 	xa_tag_t *tag;
638 
639 	/*
640 	 * Determine if we can destroy the softc.
641 	 */
642 	xa_printf(1, "Terminate check xa%d (%d,%d,%d) sc=%p ",
643 		sc->unit,
644 		sc->opencnt, sc->serializing, sc->spancnt,
645 		sc);
646 
647 	if (sc->opencnt || sc->serializing || sc->spancnt ||
648 	    TAILQ_FIRST(&sc->bioq) || TAILQ_FIRST(&sc->tag_pendq)) {
649 		xa_printf(1, "%s", "(leave intact)\n");
650 		return;
651 	}
652 
653 	/*
654 	 * Remove from device tree, a race with a new incoming span
655 	 * will create a new softc and disk.
656 	 */
657 	RB_REMOVE(xa_softc_tree, &xa_device_tree, sc);
658 	sc->terminating = 1;
659 
660 	/*
661 	 * Device has to go first to prevent device ops races.
662 	 */
663 	if (sc->dev) {
664 		disk_destroy(&sc->disk);
665 		devstat_remove_entry(&sc->stats);
666 		sc->dev->si_drv1 = NULL;
667 		sc->dev = NULL;
668 	}
669 
670 	xa_printf(1, "%s", "(remove from tree)\n");
671 	sc->serializing = 1;
672 	KKASSERT(sc->opencnt == 0);
673 	KKASSERT(TAILQ_EMPTY(&sc->tag_pendq));
674 
675 	while ((tag = TAILQ_FIRST(&sc->tag_freeq)) != NULL) {
676 		TAILQ_REMOVE(&sc->tag_freeq, tag, entry);
677 		tag->sc = NULL;
678 		kfree(tag, M_XDISK);
679 	}
680 
681 	kfree(sc, M_XDISK);
682 }
683 
684 /************************************************************************
685  *			   XA DEVICE INTERFACE				*
686  ************************************************************************/
687 
688 static int
689 xa_open(struct dev_open_args *ap)
690 {
691 	cdev_t dev = ap->a_head.a_dev;
692 	xa_softc_t *sc;
693 	int error;
694 
695 	dev->si_bsize_phys = 512;
696 	dev->si_bsize_best = 32768;
697 
698 	/*
699 	 * Interlock open with opencnt, wait for attachment operations
700 	 * to finish.
701 	 */
702 	lockmgr(&xdisk_lk, LK_EXCLUSIVE);
703 again:
704 	sc = dev->si_drv1;
705 	if (sc == NULL) {
706 		lockmgr(&xdisk_lk, LK_RELEASE);
707 		return ENXIO;	/* raced destruction */
708 	}
709 	if (sc->serializing) {
710 		tsleep(sc, 0, "xarace", hz / 10);
711 		goto again;
712 	}
713 	if (sc->terminating) {
714 		lockmgr(&xdisk_lk, LK_RELEASE);
715 		return ENXIO;	/* raced destruction */
716 	}
717 	sc->serializing = 1;
718 
719 	/*
720 	 * Serialize initial open
721 	 */
722 	if (sc->opencnt++ > 0) {
723 		sc->serializing = 0;
724 		wakeup(sc);
725 		lockmgr(&xdisk_lk, LK_RELEASE);
726 		return(0);
727 	}
728 
729 	/*
730 	 * Issue BLK_OPEN if necessary.  ENXIO is returned if we have trouble.
731 	 */
732 	if (sc->open_tag == NULL) {
733 		lockmgr(&sc->lk, LK_EXCLUSIVE);
734 		xa_restart_deferred(sc); /* eats serializing */
735 		lockmgr(&sc->lk, LK_RELEASE);
736 	} else {
737 		sc->serializing = 0;
738 		wakeup(sc);
739 	}
740 	lockmgr(&xdisk_lk, LK_RELEASE);
741 
742 	/*
743 	 * Wait for completion of the BLK_OPEN
744 	 */
745 	lockmgr(&xdisk_lk, LK_EXCLUSIVE);
746 	while (sc->serializing)
747 		lksleep(sc, &xdisk_lk, 0, "xaopen", hz);
748 
749 	error = sc->last_error;
750 	if (error) {
751 		KKASSERT(sc->opencnt > 0);
752 		--sc->opencnt;
753 		xa_terminate_check(sc);
754 		sc = NULL;	/* sc may be invalid now */
755 	}
756 	lockmgr(&xdisk_lk, LK_RELEASE);
757 
758 	return (error);
759 }
760 
761 static int
762 xa_close(struct dev_close_args *ap)
763 {
764 	cdev_t dev = ap->a_head.a_dev;
765 	xa_softc_t *sc;
766 	xa_tag_t *tag;
767 
768 	lockmgr(&xdisk_lk, LK_EXCLUSIVE);
769 	sc = dev->si_drv1;
770 	if (sc == NULL) {
771 		lockmgr(&sc->lk, LK_RELEASE);
772 		return ENXIO;	/* raced destruction */
773 	}
774 	if (sc->terminating) {
775 		lockmgr(&sc->lk, LK_RELEASE);
776 		return ENXIO;	/* raced destruction */
777 	}
778 	lockmgr(&sc->lk, LK_EXCLUSIVE);
779 
780 	/*
781 	 * NOTE: Clearing open_tag allows a concurrent open to re-open
782 	 *	 the device and prevents autonomous completion of the tag.
783 	 */
784 	if (sc->opencnt == 1 && sc->open_tag) {
785 		tag = sc->open_tag;
786 		sc->open_tag = NULL;
787 		lockmgr(&sc->lk, LK_RELEASE);
788 		kdmsg_state_reply(tag->state, 0);	/* close our side */
789 		xa_wait(tag);				/* wait on remote */
790 	} else {
791 		lockmgr(&sc->lk, LK_RELEASE);
792 	}
793 	KKASSERT(sc->opencnt > 0);
794 	--sc->opencnt;
795 	xa_terminate_check(sc);
796 	lockmgr(&xdisk_lk, LK_RELEASE);
797 
798 	return(0);
799 }
800 
801 static int
802 xa_strategy(struct dev_strategy_args *ap)
803 {
804 	xa_softc_t *sc = ap->a_head.a_dev->si_drv1;
805 	xa_tag_t *tag;
806 	struct bio *bio = ap->a_bio;
807 
808 	devstat_start_transaction(&sc->stats);
809 	atomic_add_int(&xa_active, 1);
810 	xa_last = bio->bio_offset;
811 
812 	/*
813 	 * If no tags are available NULL is returned and the bio is
814 	 * placed on sc->bioq.
815 	 */
816 	lockmgr(&sc->lk, LK_EXCLUSIVE);
817 	tag = xa_setup_cmd(sc, bio);
818 	if (tag)
819 		xa_start(tag, NULL, 1);
820 	lockmgr(&sc->lk, LK_RELEASE);
821 
822 	return(0);
823 }
824 
825 static int
826 xa_ioctl(struct dev_ioctl_args *ap)
827 {
828 	return(ENOTTY);
829 }
830 
831 static int
832 xa_size(struct dev_psize_args *ap)
833 {
834 	struct xa_softc *sc;
835 
836 	if ((sc = ap->a_head.a_dev->si_drv1) == NULL)
837 		return (ENXIO);
838 	ap->a_result = sc->info.d_media_blocks;
839 	return (0);
840 }
841 
842 /************************************************************************
843  *		    XA BLOCK PROTOCOL STATE MACHINE			*
844  ************************************************************************
845  *
846  * Implement tag/msg setup and related functions.
847  * Called with sc->lk held.
848  */
849 static xa_tag_t *
850 xa_setup_cmd(xa_softc_t *sc, struct bio *bio)
851 {
852 	xa_tag_t *tag;
853 
854 	/*
855 	 * Only get a tag if we have a valid virtual circuit to the server.
856 	 */
857 	if ((tag = TAILQ_FIRST(&sc->tag_freeq)) != NULL) {
858 		TAILQ_REMOVE(&sc->tag_freeq, tag, entry);
859 		tag->bio = bio;
860 		TAILQ_INSERT_TAIL(&sc->tag_pendq, tag, entry);
861 	}
862 
863 	/*
864 	 * If we can't dispatch now and this is a bio, queue it for later.
865 	 */
866 	if (tag == NULL && bio) {
867 		TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act);
868 	}
869 
870 	return (tag);
871 }
872 
873 /*
874  * Called with sc->lk held
875  */
876 static void
877 xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async)
878 {
879 	xa_softc_t *sc = tag->sc;
880 
881 	tag->done = 0;
882 	tag->async = async;
883 	tag->status.head.error = DMSG_ERR_IO;	/* fallback error */
884 
885 	if (msg == NULL) {
886 		struct bio *bio;
887 		struct buf *bp;
888 		kdmsg_state_t *trans;
889 
890 		if (sc->opencnt == 0 || sc->open_tag == NULL) {
891 			TAILQ_FOREACH(trans, &sc->spanq, user_entry) {
892 				if ((trans->rxcmd & DMSGF_DELETE) == 0)
893 					break;
894 			}
895 		} else {
896 			trans = sc->open_tag->state;
897 		}
898 		if (trans == NULL)
899 			goto skip;
900 
901 		KKASSERT(tag->bio);
902 		bio = tag->bio;
903 		bp = bio->bio_buf;
904 
905 		switch(bp->b_cmd) {
906 		case BUF_CMD_READ:
907 			msg = kdmsg_msg_alloc(trans,
908 					      DMSG_BLK_READ |
909 					      DMSGF_CREATE |
910 					      DMSGF_DELETE,
911 					      xa_bio_completion, tag);
912 			msg->any.blk_read.keyid = sc->keyid;
913 			msg->any.blk_read.offset = bio->bio_offset;
914 			msg->any.blk_read.bytes = bp->b_bcount;
915 			break;
916 		case BUF_CMD_WRITE:
917 			msg = kdmsg_msg_alloc(trans,
918 					      DMSG_BLK_WRITE |
919 					      DMSGF_CREATE | DMSGF_DELETE,
920 					      xa_bio_completion, tag);
921 			msg->any.blk_write.keyid = sc->keyid;
922 			msg->any.blk_write.offset = bio->bio_offset;
923 			msg->any.blk_write.bytes = bp->b_bcount;
924 			msg->aux_data = bp->b_data;
925 			msg->aux_size = bp->b_bcount;
926 			break;
927 		case BUF_CMD_FLUSH:
928 			msg = kdmsg_msg_alloc(trans,
929 					      DMSG_BLK_FLUSH |
930 					      DMSGF_CREATE | DMSGF_DELETE,
931 					      xa_bio_completion, tag);
932 			msg->any.blk_flush.keyid = sc->keyid;
933 			msg->any.blk_flush.offset = bio->bio_offset;
934 			msg->any.blk_flush.bytes = bp->b_bcount;
935 			break;
936 		case BUF_CMD_FREEBLKS:
937 			msg = kdmsg_msg_alloc(trans,
938 					      DMSG_BLK_FREEBLKS |
939 					      DMSGF_CREATE | DMSGF_DELETE,
940 					      xa_bio_completion, tag);
941 			msg->any.blk_freeblks.keyid = sc->keyid;
942 			msg->any.blk_freeblks.offset = bio->bio_offset;
943 			msg->any.blk_freeblks.bytes = bp->b_bcount;
944 			break;
945 		default:
946 			bp->b_flags |= B_ERROR;
947 			bp->b_error = EIO;
948 			devstat_end_transaction_buf(&sc->stats, bp);
949 			atomic_add_int(&xa_active, -1);
950 			biodone(bio);
951 			tag->bio = NULL;
952 			break;
953 		}
954 	}
955 
956 	/*
957 	 * If no msg was allocated we likely could not find a good span.
958 	 */
959 skip:
960 	if (msg) {
961 		/*
962 		 * Message was passed in or constructed.
963 		 */
964 		tag->state = msg->state;
965 		lockmgr(&sc->lk, LK_RELEASE);
966 		kdmsg_msg_write(msg);
967 		lockmgr(&sc->lk, LK_EXCLUSIVE);
968 	} else if (tag->bio &&
969 		   (tag->bio->bio_buf->b_flags & B_FAILONDIS) == 0) {
970 		/*
971 		 * No spans available but BIO is not allowed to fail
972 		 * on connectivity problems.  Requeue the BIO.
973 		 */
974 		TAILQ_INSERT_TAIL(&sc->bioq, tag->bio, bio_act);
975 		tag->bio = NULL;
976 		lockmgr(&sc->lk, LK_RELEASE);
977 		xa_done(tag, 1);
978 		lockmgr(&sc->lk, LK_EXCLUSIVE);
979 	} else {
980 		/*
981 		 * No spans available, bio is allowed to fail.
982 		 */
983 		lockmgr(&sc->lk, LK_RELEASE);
984 		tag->status.head.error = DMSG_ERR_IO;
985 		xa_done(tag, 1);
986 		lockmgr(&sc->lk, LK_EXCLUSIVE);
987 	}
988 }
989 
990 static uint32_t
991 xa_wait(xa_tag_t *tag)
992 {
993 	xa_softc_t *sc = tag->sc;
994 	uint32_t error;
995 
996 	lockmgr(&sc->lk, LK_EXCLUSIVE);
997 	tag->waiting = 1;
998 	while (tag->done == 0)
999 		lksleep(tag, &sc->lk, 0, "xawait", 0);
1000 	lockmgr(&sc->lk, LK_RELEASE);
1001 
1002 	error = tag->status.head.error;
1003 	tag->waiting = 0;
1004 	xa_release(tag, 0);
1005 
1006 	return error;
1007 }
1008 
1009 static void
1010 xa_done(xa_tag_t *tag, int wasbio)
1011 {
1012 	KKASSERT(tag->bio == NULL);
1013 
1014 	tag->state = NULL;
1015 	tag->done = 1;
1016 	if (tag->waiting)
1017 		wakeup(tag);
1018 	if (tag->async)
1019 		xa_release(tag, wasbio);
1020 }
1021 
1022 /*
1023  * Release a tag.  If everything looks ok and there are pending BIOs
1024  * (due to all tags in-use), we can use the tag to start the next BIO.
1025  * Do not try to restart if the connection is currently failed.
1026  */
1027 static
1028 void
1029 xa_release(xa_tag_t *tag, int wasbio)
1030 {
1031 	xa_softc_t *sc = tag->sc;
1032 	struct bio *bio;
1033 
1034 	if ((bio = tag->bio) != NULL) {
1035 		struct buf *bp = bio->bio_buf;
1036 
1037 		bp->b_error = EIO;
1038 		bp->b_flags |= B_ERROR;
1039 		devstat_end_transaction_buf(&sc->stats, bp);
1040 		atomic_add_int(&xa_active, -1);
1041 		biodone(bio);
1042 		tag->bio = NULL;
1043 	}
1044 
1045 	lockmgr(&sc->lk, LK_EXCLUSIVE);
1046 
1047 	if (wasbio && sc->open_tag &&
1048 	    (bio = TAILQ_FIRST(&sc->bioq)) != NULL) {
1049 		TAILQ_REMOVE(&sc->bioq, bio, bio_act);
1050 		tag->bio = bio;
1051 		xa_start(tag, NULL, 1);
1052 	} else {
1053 		TAILQ_REMOVE(&sc->tag_pendq, tag, entry);
1054 		TAILQ_INSERT_TAIL(&sc->tag_freeq, tag, entry);
1055 	}
1056 	lockmgr(&sc->lk, LK_RELEASE);
1057 }
1058 
1059 /*
1060  * Handle messages under the BLKOPEN transaction.
1061  */
1062 static int
1063 xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
1064 {
1065 	xa_tag_t *tag = state->any.any;
1066 	xa_softc_t *sc;
1067 	struct bio *bio;
1068 
1069 	/*
1070 	 * If the tag has been cleaned out we already closed our side
1071 	 * of the transaction and we are waiting for the other side to
1072 	 * close.
1073 	 */
1074 	xa_printf(1, "xa_sync_completion: tag %p msg %08x state %p\n",
1075 		  tag, msg->any.head.cmd, msg->state);
1076 
1077 	if (tag == NULL) {
1078 		if (msg->any.head.cmd & DMSGF_CREATE)
1079 			kdmsg_state_reply(state, DMSG_ERR_LOSTLINK);
1080 		return 0;
1081 	}
1082 	sc = tag->sc;
1083 
1084 	/*
1085 	 * Validate the tag
1086 	 */
1087 	lockmgr(&sc->lk, LK_EXCLUSIVE);
1088 
1089 	/*
1090 	 * Handle initial response to our open and restart any deferred
1091 	 * BIOs on success.
1092 	 *
1093 	 * NOTE: DELETE may also be set.
1094 	 */
1095 	if (msg->any.head.cmd & DMSGF_CREATE) {
1096 		switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
1097 		case DMSG_LNK_ERROR | DMSGF_REPLY:
1098 			bzero(&tag->status, sizeof(tag->status));
1099 			tag->status.head = msg->any.head;
1100 			break;
1101 		case DMSG_BLK_ERROR | DMSGF_REPLY:
1102 			tag->status = msg->any.blk_error;
1103 			break;
1104 		}
1105 		sc->last_error = tag->status.head.error;
1106 		xa_printf(1, "blk_open completion status %d\n",
1107 			  sc->last_error);
1108 		if (sc->last_error == 0) {
1109 			while ((bio = TAILQ_FIRST(&sc->bioq)) != NULL) {
1110 				tag = xa_setup_cmd(sc, NULL);
1111 				if (tag == NULL)
1112 					break;
1113 				TAILQ_REMOVE(&sc->bioq, bio, bio_act);
1114 				tag->bio = bio;
1115 				xa_start(tag, NULL, 1);
1116 			}
1117 		}
1118 		sc->serializing = 0;
1119 		wakeup(sc);
1120 	}
1121 
1122 	/*
1123 	 * Handle unexpected termination (or lost comm channel) from other
1124 	 * side.  Autonomous completion only if open_tag matches,
1125 	 * otherwise another thread is probably waiting on the tag.
1126 	 *
1127 	 * (see xa_close() for other interactions)
1128 	 */
1129 	if (msg->any.head.cmd & DMSGF_DELETE) {
1130 		kdmsg_state_reply(tag->state, 0);
1131 		if (sc->open_tag == tag) {
1132 			sc->open_tag = NULL;
1133 			xa_done(tag, 0);
1134 		} else {
1135 			tag->async = 0;
1136 			xa_done(tag, 0);
1137 		}
1138 	}
1139 	lockmgr(&sc->lk, LK_RELEASE);
1140 
1141 	return (0);
1142 }
1143 
1144 static int
1145 xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
1146 {
1147 	xa_tag_t *tag = state->any.any;
1148 	xa_softc_t *sc = tag->sc;
1149 	struct bio *bio;
1150 	struct buf *bp;
1151 
1152 	/*
1153 	 * Get the bio from the tag.  If no bio is present we just do
1154 	 * 'done' handling.
1155 	 */
1156 	if ((bio = tag->bio) == NULL)
1157 		goto handle_done;
1158 	bp = bio->bio_buf;
1159 
1160 	/*
1161 	 * Process return status
1162 	 */
1163 	switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
1164 	case DMSG_LNK_ERROR | DMSGF_REPLY:
1165 		bzero(&tag->status, sizeof(tag->status));
1166 		tag->status.head = msg->any.head;
1167 		if (tag->status.head.error)
1168 			tag->status.resid = bp->b_bcount;
1169 		else
1170 			tag->status.resid = 0;
1171 		break;
1172 	case DMSG_BLK_ERROR | DMSGF_REPLY:
1173 		tag->status = msg->any.blk_error;
1174 		break;
1175 	}
1176 
1177 	/*
1178 	 * If the device is open stall the bio on DMSG errors.  If an
1179 	 * actual I/O error occured on the remote device, DMSG_ERR_IO
1180 	 * will be returned.
1181 	 */
1182 	if (tag->status.head.error &&
1183 	    (msg->any.head.cmd & DMSGF_DELETE) && sc->opencnt) {
1184 		if (tag->status.head.error != DMSG_ERR_IO)
1185 			goto handle_repend;
1186 	}
1187 
1188 	/*
1189 	 * Process bio completion
1190 	 *
1191 	 * For reads any returned data is zero-extended if necessary, so
1192 	 * the server can short-cut any all-zeros reads if it desires.
1193 	 */
1194 	switch(bp->b_cmd) {
1195 	case BUF_CMD_READ:
1196 		if (msg->aux_data && msg->aux_size) {
1197 			if (msg->aux_size < bp->b_bcount) {
1198 				bcopy(msg->aux_data, bp->b_data, msg->aux_size);
1199 				bzero(bp->b_data + msg->aux_size,
1200 				      bp->b_bcount - msg->aux_size);
1201 			} else {
1202 				bcopy(msg->aux_data, bp->b_data, bp->b_bcount);
1203 			}
1204 		} else {
1205 			bzero(bp->b_data, bp->b_bcount);
1206 		}
1207 		/* fall through */
1208 	case BUF_CMD_WRITE:
1209 	case BUF_CMD_FLUSH:
1210 	case BUF_CMD_FREEBLKS:
1211 	default:
1212 		if (tag->status.resid > bp->b_bcount)
1213 			tag->status.resid = bp->b_bcount;
1214 		bp->b_resid = tag->status.resid;
1215 		if (tag->status.head.error != 0) {
1216 			bp->b_error = EIO;
1217 			bp->b_flags |= B_ERROR;
1218 		} else {
1219 			bp->b_resid = 0;
1220 		}
1221 		devstat_end_transaction_buf(&sc->stats, bp);
1222 		atomic_add_int(&xa_active, -1);
1223 		biodone(bio);
1224 		tag->bio = NULL;
1225 		break;
1226 	}
1227 
1228 	/*
1229 	 * Handle completion of the transaction.  If the bioq is not empty
1230 	 * we can initiate another bio on the same tag.
1231 	 *
1232 	 * NOTE: Most of our transactions will be single-message
1233 	 *	 CREATE+DELETEs, so we won't have to terminate the
1234 	 *	 transaction separately, here.  But just in case they
1235 	 *	 aren't be sure to terminate the transaction.
1236 	 */
1237 handle_done:
1238 	if (msg->any.head.cmd & DMSGF_DELETE) {
1239 		xa_done(tag, 1);
1240 		if ((state->txcmd & DMSGF_DELETE) == 0)
1241 			kdmsg_msg_reply(msg, 0);
1242 	}
1243 	return (0);
1244 
1245 	/*
1246 	 * Handle the case where the transaction failed due to a
1247 	 * connectivity issue.  The tag is put away with wasbio=0
1248 	 * and we put the BIO back onto the bioq for a later restart.
1249 	 *
1250 	 * probe I/Os (where the device is not open) will be failed
1251 	 * instead of requeued.
1252 	 */
1253 handle_repend:
1254 	tag->bio = NULL;
1255 	if (bio->bio_buf->b_flags & B_FAILONDIS) {
1256 		xa_printf(1, "xa_strategy: lost link, fail probe bp %p\n",
1257 			  bio->bio_buf);
1258 		bio->bio_buf->b_error = ENXIO;
1259 		bio->bio_buf->b_flags |= B_ERROR;
1260 		biodone(bio);
1261 		bio = NULL;
1262 	} else {
1263 		xa_printf(1, "xa_strategy: lost link, requeue bp %p\n",
1264 			  bio->bio_buf);
1265 	}
1266 	xa_done(tag, 0);
1267 	if ((state->txcmd & DMSGF_DELETE) == 0)
1268 		kdmsg_msg_reply(msg, 0);
1269 
1270 	/*
1271 	 * Requeue the bio
1272 	 */
1273 	if (bio) {
1274 		lockmgr(&sc->lk, LK_EXCLUSIVE);
1275 		TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act);
1276 		lockmgr(&sc->lk, LK_RELEASE);
1277 	}
1278 	return (0);
1279 }
1280 
1281 /*
1282  * Restart as much deferred I/O as we can.  The serializer is set and we
1283  * eat it (clear it) when done.
1284  *
1285  * Called with sc->lk held
1286  */
1287 static
1288 void
1289 xa_restart_deferred(xa_softc_t *sc)
1290 {
1291 	kdmsg_state_t *span;
1292 	kdmsg_msg_t *msg;
1293 	xa_tag_t *tag;
1294 	int error;
1295 
1296 	KKASSERT(sc->serializing);
1297 
1298 	/*
1299 	 * Determine if a restart is needed.
1300 	 */
1301 	if (sc->opencnt == 0) {
1302 		/*
1303 		 * Device is not open, nothing to do, eat serializing.
1304 		 */
1305 		sc->serializing = 0;
1306 		wakeup(sc);
1307 	} else if (sc->open_tag == NULL) {
1308 		/*
1309 		 * BLK_OPEN required before we can restart any BIOs.
1310 		 * Select the best LNK_SPAN to issue the BLK_OPEN under.
1311 		 *
1312 		 * serializing interlocks waiting open()s.
1313 		 */
1314 		error = 0;
1315 		TAILQ_FOREACH(span, &sc->spanq, user_entry) {
1316 			if ((span->rxcmd & DMSGF_DELETE) == 0)
1317 				break;
1318 		}
1319 		if (span == NULL)
1320 			error = ENXIO;
1321 
1322 		if (error == 0) {
1323 			tag = xa_setup_cmd(sc, NULL);
1324 			if (tag == NULL)
1325 				error = ENXIO;
1326 		}
1327 		if (error == 0) {
1328 			sc->open_tag = tag;
1329 			msg = kdmsg_msg_alloc(span,
1330 					      DMSG_BLK_OPEN |
1331 					      DMSGF_CREATE,
1332 					      xa_sync_completion, tag);
1333 			msg->any.blk_open.modes = DMSG_BLKOPEN_RD;
1334 			xa_printf(1,
1335 				  "BLK_OPEN tag %p state %p "
1336 				  "span-state %p\n",
1337 				  tag, msg->state, span);
1338 			xa_start(tag, msg, 0);
1339 		}
1340 		if (error) {
1341 			sc->serializing = 0;
1342 			wakeup(sc);
1343 		}
1344 		/* else leave serializing set until BLK_OPEN response */
1345 	} else {
1346 		/* nothing to do */
1347 		sc->serializing = 0;
1348 		wakeup(sc);
1349 	}
1350 }
1351