1 /* 2 * Copyright (c) 2012-2014 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@dragonflybsd.org> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 */ 34 /* 35 * This module allows disk devices to be created and associated with a 36 * communications pipe or socket. You open the device and issue an 37 * ioctl() to install a new disk along with its communications descriptor. 38 * 39 * All further communication occurs via the descriptor using the DMSG 40 * LNK_CONN, LNK_SPAN, and BLOCK protocols. The descriptor can be a 41 * direct connection to a remote machine's disk (in-kernenl), to a remote 42 * cluster controller, to the local cluster controller, etc. 43 * 44 * /dev/xdisk is the control device, issue ioctl()s to create the /dev/xa%d 45 * devices. These devices look like raw disks to the system. 46 */ 47 #include <sys/param.h> 48 #include <sys/systm.h> 49 #include <sys/buf.h> 50 #include <sys/conf.h> 51 #include <sys/device.h> 52 #include <sys/devicestat.h> 53 #include <sys/disk.h> 54 #include <sys/kernel.h> 55 #include <sys/malloc.h> 56 #include <sys/sysctl.h> 57 #include <sys/proc.h> 58 #include <sys/queue.h> 59 #include <sys/tree.h> 60 #include <sys/udev.h> 61 #include <sys/uuid.h> 62 #include <sys/kern_syscall.h> 63 64 #include <sys/dmsg.h> 65 #include <sys/xdiskioctl.h> 66 67 #include <sys/buf2.h> 68 69 struct xa_softc; 70 struct xa_softc_tree; 71 RB_HEAD(xa_softc_tree, xa_softc); 72 RB_PROTOTYPE(xa_softc_tree, xa_softc, rbnode, xa_softc_cmp); 73 74 static int xa_active; 75 SYSCTL_INT(_debug, OID_AUTO, xa_active, CTLFLAG_RW, &xa_active, 0, 76 "Number of active xdisk IOs"); 77 static uint64_t xa_last; 78 SYSCTL_ULONG(_debug, OID_AUTO, xa_last, CTLFLAG_RW, &xa_last, 0, 79 "Offset of last xdisk IO"); 80 static int xa_debug = 1; 81 SYSCTL_INT(_debug, OID_AUTO, xa_debug, CTLFLAG_RW, &xa_debug, 0, 82 "xdisk debugging"); 83 84 /* 85 * Track a BIO tag 86 */ 87 struct xa_tag { 88 TAILQ_ENTRY(xa_tag) entry; 89 struct xa_softc *sc; 90 dmsg_blk_error_t status; 91 kdmsg_state_t *state; 92 struct bio *bio; 93 int waiting; 94 int async; 95 int done; 96 }; 97 98 typedef struct xa_tag xa_tag_t; 99 100 /* 101 * Track devices. 102 */ 103 struct xa_softc { 104 struct kdmsg_state_list spanq; 105 RB_ENTRY(xa_softc) rbnode; 106 cdev_t dev; 107 struct devstat stats; 108 struct disk_info info; 109 struct disk disk; 110 uuid_t peer_id; 111 int unit; 112 int opencnt; 113 int spancnt; 114 uint64_t keyid; 115 int serializing; 116 int last_error; 117 int terminating; 118 char peer_label[64]; /* from LNK_SPAN host/dev */ 119 char pfs_label[64]; /* from LNK_SPAN serno */ 120 xa_tag_t *open_tag; 121 TAILQ_HEAD(, bio) bioq; /* pending BIOs */ 122 TAILQ_HEAD(, xa_tag) tag_freeq; /* available I/O tags */ 123 TAILQ_HEAD(, xa_tag) tag_pendq; /* running I/O tags */ 124 struct lock lk; 125 }; 126 127 typedef struct xa_softc xa_softc_t; 128 129 struct xa_iocom { 130 TAILQ_ENTRY(xa_iocom) entry; 131 kdmsg_iocom_t iocom; 132 xa_softc_t dummysc; 133 }; 134 135 typedef struct xa_iocom xa_iocom_t; 136 137 static int xa_softc_cmp(xa_softc_t *sc1, xa_softc_t *sc2); 138 RB_GENERATE(xa_softc_tree, xa_softc, rbnode, xa_softc_cmp); 139 static struct xa_softc_tree xa_device_tree; 140 141 #define MAXTAGS 64 /* no real limit */ 142 143 static int xdisk_attach(struct xdisk_attach_ioctl *xaioc); 144 static int xdisk_detach(struct xdisk_attach_ioctl *xaioc); 145 static void xaio_exit(kdmsg_iocom_t *iocom); 146 static int xaio_rcvdmsg(kdmsg_msg_t *msg); 147 148 static void xa_terminate_check(struct xa_softc *sc); 149 150 static xa_tag_t *xa_setup_cmd(xa_softc_t *sc, struct bio *bio); 151 static void xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async); 152 static void xa_done(xa_tag_t *tag, int wasbio); 153 static void xa_release(xa_tag_t *tag, int wasbio); 154 static uint32_t xa_wait(xa_tag_t *tag); 155 static int xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg); 156 static int xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg); 157 static void xa_restart_deferred(xa_softc_t *sc); 158 159 #define xa_printf(level, ctl, ...) \ 160 if (xa_debug >= (level)) kprintf("xdisk: " ctl, __VA_ARGS__) 161 162 MALLOC_DEFINE(M_XDISK, "Networked disk client", "Network Disks"); 163 164 /* 165 * Control device, issue ioctls to create xa devices. 166 */ 167 static d_open_t xdisk_open; 168 static d_close_t xdisk_close; 169 static d_ioctl_t xdisk_ioctl; 170 171 static struct dev_ops xdisk_ops = { 172 { "xdisk", 0, D_MPSAFE | D_TRACKCLOSE }, 173 .d_open = xdisk_open, 174 .d_close = xdisk_close, 175 .d_ioctl = xdisk_ioctl 176 }; 177 178 /* 179 * XA disk devices 180 */ 181 static d_open_t xa_open; 182 static d_close_t xa_close; 183 static d_ioctl_t xa_ioctl; 184 static d_strategy_t xa_strategy; 185 static d_psize_t xa_size; 186 187 static struct dev_ops xa_ops = { 188 { "xa", 0, D_DISK | D_CANFREE | D_MPSAFE | D_TRACKCLOSE }, 189 .d_open = xa_open, 190 .d_close = xa_close, 191 .d_ioctl = xa_ioctl, 192 .d_read = physread, 193 .d_write = physwrite, 194 .d_strategy = xa_strategy, 195 .d_psize = xa_size 196 }; 197 198 static int xdisk_opencount; 199 static cdev_t xdisk_dev; 200 struct lock xdisk_lk; 201 static TAILQ_HEAD(, xa_iocom) xaiocomq; 202 203 /* 204 * Module initialization 205 */ 206 static int 207 xdisk_modevent(module_t mod, int type, void *data) 208 { 209 switch (type) { 210 case MOD_LOAD: 211 TAILQ_INIT(&xaiocomq); 212 RB_INIT(&xa_device_tree); 213 lockinit(&xdisk_lk, "xdisk", 0, 0); 214 xdisk_dev = make_dev(&xdisk_ops, 0, 215 UID_ROOT, GID_WHEEL, 0600, "xdisk"); 216 break; 217 case MOD_UNLOAD: 218 case MOD_SHUTDOWN: 219 if (!RB_EMPTY(&xa_device_tree)) 220 return (EBUSY); 221 if (xdisk_opencount || TAILQ_FIRST(&xaiocomq)) 222 return (EBUSY); 223 if (xdisk_dev) { 224 destroy_dev(xdisk_dev); 225 xdisk_dev = NULL; 226 } 227 dev_ops_remove_all(&xdisk_ops); 228 dev_ops_remove_all(&xa_ops); 229 break; 230 default: 231 break; 232 } 233 return 0; 234 } 235 236 DEV_MODULE(xdisk, xdisk_modevent, 0); 237 238 static int 239 xa_softc_cmp(xa_softc_t *sc1, xa_softc_t *sc2) 240 { 241 return(strcmp(sc1->pfs_label, sc2->pfs_label)); 242 } 243 244 /* 245 * Control device 246 */ 247 static int 248 xdisk_open(struct dev_open_args *ap) 249 { 250 lockmgr(&xdisk_lk, LK_EXCLUSIVE); 251 ++xdisk_opencount; 252 lockmgr(&xdisk_lk, LK_RELEASE); 253 return(0); 254 } 255 256 static int 257 xdisk_close(struct dev_close_args *ap) 258 { 259 lockmgr(&xdisk_lk, LK_EXCLUSIVE); 260 --xdisk_opencount; 261 lockmgr(&xdisk_lk, LK_RELEASE); 262 return(0); 263 } 264 265 static int 266 xdisk_ioctl(struct dev_ioctl_args *ap) 267 { 268 int error; 269 270 switch(ap->a_cmd) { 271 case XDISKIOCATTACH: 272 error = xdisk_attach((void *)ap->a_data); 273 break; 274 case XDISKIOCDETACH: 275 error = xdisk_detach((void *)ap->a_data); 276 break; 277 default: 278 error = ENOTTY; 279 break; 280 } 281 return error; 282 } 283 284 /************************************************************************ 285 * DMSG INTERFACE * 286 ************************************************************************/ 287 288 static int 289 xdisk_attach(struct xdisk_attach_ioctl *xaioc) 290 { 291 xa_iocom_t *xaio; 292 struct file *fp; 293 294 /* 295 * Normalize ioctl params 296 */ 297 fp = holdfp(curthread, xaioc->fd, -1); 298 if (fp == NULL) 299 return EINVAL; 300 xa_printf(1, "xdisk_attach fp=%p\n", fp); 301 302 /* 303 * See if the serial number is already present. If we are 304 * racing a termination the disk subsystem may still have 305 * duplicate entries not yet removed so we wait a bit and 306 * retry. 307 */ 308 lockmgr(&xdisk_lk, LK_EXCLUSIVE); 309 310 xaio = kmalloc(sizeof(*xaio), M_XDISK, M_WAITOK | M_ZERO); 311 kdmsg_iocom_init(&xaio->iocom, xaio, 312 KDMSG_IOCOMF_AUTOCONN, 313 M_XDISK, xaio_rcvdmsg); 314 xaio->iocom.exit_func = xaio_exit; 315 316 kdmsg_iocom_reconnect(&xaio->iocom, fp, "xdisk"); 317 318 /* 319 * Setup our LNK_CONN advertisement for autoinitiate. 320 * 321 * Our filter is setup to only accept PEER_BLOCK advertisements. 322 * XXX no peer_id filter. 323 * 324 * We need a unique pfs_fsid to avoid confusion. 325 */ 326 xaio->iocom.auto_lnk_conn.peer_type = DMSG_PEER_CLIENT; 327 xaio->iocom.auto_lnk_conn.proto_version = DMSG_SPAN_PROTO_1; 328 xaio->iocom.auto_lnk_conn.peer_mask = 1LLU << DMSG_PEER_BLOCK; 329 ksnprintf(xaio->iocom.auto_lnk_conn.peer_label, 330 sizeof(xaio->iocom.auto_lnk_conn.peer_label), 331 "%s/xdisk", 332 hostname); 333 /* kern_uuidgen(&xaio->iocom.auto_lnk_conn.pfs_fsid, 1); */ 334 335 /* 336 * Setup our LNK_SPAN advertisement for autoinitiate 337 */ 338 TAILQ_INSERT_TAIL(&xaiocomq, xaio, entry); 339 kdmsg_iocom_autoinitiate(&xaio->iocom, NULL); 340 341 lockmgr(&xdisk_lk, LK_RELEASE); 342 343 return 0; 344 } 345 346 static int 347 xdisk_detach(struct xdisk_attach_ioctl *xaioc) 348 { 349 return EINVAL; 350 } 351 352 /* 353 * Called from iocom core transmit thread upon disconnect. 354 */ 355 static 356 void 357 xaio_exit(kdmsg_iocom_t *iocom) 358 { 359 xa_iocom_t *xaio = iocom->handle; 360 361 lockmgr(&xdisk_lk, LK_EXCLUSIVE); 362 xa_printf(1, "%s", "xdisk_detach [xaio_exit()]\n"); 363 TAILQ_REMOVE(&xaiocomq, xaio, entry); 364 lockmgr(&xdisk_lk, LK_RELEASE); 365 366 kdmsg_iocom_uninit(&xaio->iocom); 367 368 kfree(xaio, M_XDISK); 369 } 370 371 /* 372 * Called from iocom core to handle messages that the iocom core does not 373 * handle itself and for which a state function callback has not yet been 374 * established. 375 * 376 * We primarily care about LNK_SPAN transactions here. 377 */ 378 static int 379 xaio_rcvdmsg(kdmsg_msg_t *msg) 380 { 381 kdmsg_state_t *state = msg->state; 382 xa_iocom_t *xaio = state->iocom->handle; 383 xa_softc_t *sc; 384 385 if (state) { 386 xa_printf(4, 387 "xdisk - rcvmsg state=%p rx=%08x tx=%08x msgcmd=%08x\n", 388 state, state->rxcmd, state->txcmd, 389 msg->any.head.cmd); 390 } 391 lockmgr(&xdisk_lk, LK_EXCLUSIVE); 392 393 switch(msg->tcmd) { 394 case DMSG_LNK_SPAN | DMSGF_CREATE | DMSGF_DELETE: 395 /* 396 * A LNK_SPAN transaction which is opened and closed 397 * degenerately is not useful to us, just ignore it. 398 */ 399 kdmsg_msg_reply(msg, 0); 400 break; 401 case DMSG_LNK_SPAN | DMSGF_CREATE: 402 /* 403 * Manage the tracking node for the remote LNK_SPAN. 404 * 405 * Return a streaming result, leaving the transaction open 406 * in both directions to allow sub-transactions. 407 */ 408 bcopy(msg->any.lnk_span.peer_label, xaio->dummysc.peer_label, 409 sizeof(xaio->dummysc.peer_label)); 410 xaio->dummysc.peer_label[ 411 sizeof(xaio->dummysc.peer_label) - 1] = 0; 412 413 bcopy(msg->any.lnk_span.pfs_label, xaio->dummysc.pfs_label, 414 sizeof(xaio->dummysc.pfs_label)); 415 xaio->dummysc.pfs_label[ 416 sizeof(xaio->dummysc.pfs_label) - 1] = 0; 417 418 xa_printf(3, "LINK_SPAN state %p create for %s\n", 419 msg->state, msg->any.lnk_span.pfs_label); 420 421 sc = RB_FIND(xa_softc_tree, &xa_device_tree, &xaio->dummysc); 422 if (sc == NULL) { 423 xa_softc_t *sctmp; 424 xa_tag_t *tag; 425 cdev_t dev; 426 int unit; 427 int n; 428 429 sc = kmalloc(sizeof(*sc), M_XDISK, M_WAITOK | M_ZERO); 430 bcopy(msg->any.lnk_span.peer_label, sc->peer_label, 431 sizeof(sc->peer_label)); 432 sc->peer_label[sizeof(sc->peer_label) - 1] = 0; 433 bcopy(msg->any.lnk_span.pfs_label, sc->pfs_label, 434 sizeof(sc->pfs_label)); 435 sc->pfs_label[sizeof(sc->pfs_label) - 1] = 0; 436 437 /* XXX FIXME O(N^2) */ 438 unit = -1; 439 do { 440 ++unit; 441 RB_FOREACH(sctmp, xa_softc_tree, 442 &xa_device_tree) { 443 if (sctmp->unit == unit) 444 break; 445 } 446 } while (sctmp); 447 448 sc->unit = unit; 449 sc->serializing = 1; 450 sc->spancnt = 1; 451 lockinit(&sc->lk, "xalk", 0, 0); 452 TAILQ_INIT(&sc->spanq); 453 TAILQ_INIT(&sc->bioq); 454 TAILQ_INIT(&sc->tag_freeq); 455 TAILQ_INIT(&sc->tag_pendq); 456 457 lockmgr(&sc->lk, LK_EXCLUSIVE); 458 RB_INSERT(xa_softc_tree, &xa_device_tree, sc); 459 TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry); 460 msg->state->any.xa_sc = sc; 461 462 /* 463 * Setup block device 464 */ 465 for (n = 0; n < MAXTAGS; ++n) { 466 tag = kmalloc(sizeof(*tag), 467 M_XDISK, M_WAITOK|M_ZERO); 468 tag->sc = sc; 469 TAILQ_INSERT_TAIL(&sc->tag_freeq, tag, entry); 470 } 471 472 if (sc->dev == NULL) { 473 dev = disk_create(unit, &sc->disk, &xa_ops); 474 dev->si_drv1 = sc; 475 sc->dev = dev; 476 devstat_add_entry(&sc->stats, "xa", unit, 477 DEV_BSIZE, 478 DEVSTAT_NO_ORDERED_TAGS, 479 DEVSTAT_TYPE_DIRECT | 480 DEVSTAT_TYPE_IF_OTHER, 481 DEVSTAT_PRIORITY_OTHER); 482 } 483 484 sc->info.d_media_blksize = 485 msg->any.lnk_span.media.block.blksize; 486 if (sc->info.d_media_blksize <= 0) 487 sc->info.d_media_blksize = 1; 488 sc->info.d_media_blocks = 489 msg->any.lnk_span.media.block.bytes / 490 sc->info.d_media_blksize; 491 sc->info.d_dsflags = DSO_MBRQUIET | DSO_RAWPSIZE; 492 sc->info.d_secpertrack = 32; 493 sc->info.d_nheads = 64; 494 sc->info.d_secpercyl = sc->info.d_secpertrack * 495 sc->info.d_nheads; 496 sc->info.d_ncylinders = 0; 497 if (sc->pfs_label[0]) 498 sc->info.d_serialno = sc->pfs_label; 499 /* 500 * WARNING! disk_setdiskinfo() must be asynchronous 501 * because we are in the rxmsg thread. If 502 * it is synchronous and issues more disk 503 * I/Os, we will deadlock. 504 */ 505 disk_setdiskinfo(&sc->disk, &sc->info); 506 xa_restart_deferred(sc); /* eats serializing */ 507 lockmgr(&sc->lk, LK_RELEASE); 508 } else { 509 lockmgr(&sc->lk, LK_EXCLUSIVE); 510 ++sc->spancnt; 511 TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry); 512 msg->state->any.xa_sc = sc; 513 if (sc->serializing == 0 && sc->open_tag == NULL) { 514 sc->serializing = 1; 515 xa_restart_deferred(sc); /* eats serializing */ 516 } 517 lockmgr(&sc->lk, LK_RELEASE); 518 if (sc->dev && sc->dev->si_disk) { 519 xa_printf(1, "reprobe disk: %s\n", 520 sc->pfs_label); 521 disk_msg_send(DISK_DISK_REPROBE, 522 sc->dev->si_disk, 523 NULL); 524 } 525 } 526 xa_printf(2, "sc %p spancnt %d\n", sc, sc->spancnt); 527 kdmsg_msg_result(msg, 0); 528 break; 529 case DMSG_LNK_SPAN | DMSGF_DELETE: 530 /* 531 * Manage the tracking node for the remote LNK_SPAN. 532 * 533 * Return a final result, closing our end of the transaction. 534 */ 535 sc = msg->state->any.xa_sc; 536 xa_printf(3, "LINK_SPAN state %p delete for %s (sc=%p)\n", 537 msg->state, (sc ? sc->pfs_label : "(null)"), sc); 538 lockmgr(&sc->lk, LK_EXCLUSIVE); 539 msg->state->any.xa_sc = NULL; 540 TAILQ_REMOVE(&sc->spanq, msg->state, user_entry); 541 --sc->spancnt; 542 543 xa_printf(2, "sc %p spancnt %d\n", sc, sc->spancnt); 544 545 /* 546 * Spans can come and go as the graph stabilizes, so if 547 * we lose a span along with sc->open_tag we may be able 548 * to restart the I/Os on a different span. 549 */ 550 if (sc->spancnt && 551 sc->serializing == 0 && sc->open_tag == NULL) { 552 sc->serializing = 1; 553 xa_restart_deferred(sc); 554 } 555 lockmgr(&sc->lk, LK_RELEASE); 556 kdmsg_msg_reply(msg, 0); 557 558 #if 0 559 /* 560 * Termination 561 */ 562 if (sc->spancnt == 0) 563 xa_terminate_check(sc); 564 #endif 565 break; 566 case DMSG_LNK_SPAN | DMSGF_DELETE | DMSGF_REPLY: 567 /* 568 * Ignore unimplemented streaming replies on our LNK_SPAN 569 * transaction. 570 */ 571 xa_printf(3, "LINK_SPAN state %p delete+reply\n", 572 msg->state); 573 break; 574 case DMSG_LNK_SPAN | DMSGF_REPLY: 575 /* 576 * Ignore unimplemented streaming replies on our LNK_SPAN 577 * transaction. 578 */ 579 xa_printf(3, "LINK_SPAN state %p reply\n", 580 msg->state); 581 break; 582 case DMSG_DBG_SHELL: 583 /* 584 * Execute shell command (not supported atm). 585 * 586 * This is a one-way packet but if not (e.g. if part of 587 * a streaming transaction), we will have already closed 588 * our end. 589 */ 590 kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP); 591 break; 592 case DMSG_DBG_SHELL | DMSGF_REPLY: 593 /* 594 * Receive one or more replies to a shell command 595 * that we sent. Just dump it to the console. 596 * 597 * This is a one-way packet but if not (e.g. if 598 * part of a streaming transaction), we will have 599 * already closed our end. 600 */ 601 if (msg->aux_data) { 602 msg->aux_data[msg->aux_size - 1] = 0; 603 xa_printf(0, "DEBUGMSG: %s\n", msg->aux_data); 604 } 605 break; 606 default: 607 /* 608 * Unsupported one-way message, streaming message, or 609 * transaction. 610 * 611 * Terminate any unsupported transactions with an error 612 * and ignore any unsupported streaming messages. 613 * 614 * NOTE: This case also includes DMSG_LNK_ERROR messages 615 * which might be one-way, replying to those would 616 * cause an infinite ping-pong. 617 */ 618 if (msg->any.head.cmd & DMSGF_CREATE) 619 kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP); 620 break; 621 } 622 lockmgr(&xdisk_lk, LK_RELEASE); 623 624 return 0; 625 } 626 627 /* 628 * Determine if we can destroy the xa_softc. 629 * 630 * Called with xdisk_lk held. 631 */ 632 static 633 void 634 xa_terminate_check(struct xa_softc *sc) 635 { 636 xa_tag_t *tag; 637 638 /* 639 * Determine if we can destroy the softc. 640 */ 641 xa_printf(1, "Terminate check xa%d (%d,%d,%d) sc=%p ", 642 sc->unit, 643 sc->opencnt, sc->serializing, sc->spancnt, 644 sc); 645 646 if (sc->opencnt || sc->serializing || sc->spancnt || 647 TAILQ_FIRST(&sc->bioq) || TAILQ_FIRST(&sc->tag_pendq)) { 648 xa_printf(1, "%s", "(leave intact)\n"); 649 return; 650 } 651 652 /* 653 * Remove from device tree, a race with a new incoming span 654 * will create a new softc and disk. 655 */ 656 RB_REMOVE(xa_softc_tree, &xa_device_tree, sc); 657 sc->terminating = 1; 658 659 /* 660 * Device has to go first to prevent device ops races. 661 */ 662 if (sc->dev) { 663 disk_destroy(&sc->disk); 664 devstat_remove_entry(&sc->stats); 665 sc->dev->si_drv1 = NULL; 666 sc->dev = NULL; 667 } 668 669 xa_printf(1, "%s", "(remove from tree)\n"); 670 sc->serializing = 1; 671 KKASSERT(sc->opencnt == 0); 672 KKASSERT(TAILQ_EMPTY(&sc->tag_pendq)); 673 674 while ((tag = TAILQ_FIRST(&sc->tag_freeq)) != NULL) { 675 TAILQ_REMOVE(&sc->tag_freeq, tag, entry); 676 tag->sc = NULL; 677 kfree(tag, M_XDISK); 678 } 679 680 kfree(sc, M_XDISK); 681 } 682 683 /************************************************************************ 684 * XA DEVICE INTERFACE * 685 ************************************************************************/ 686 687 static int 688 xa_open(struct dev_open_args *ap) 689 { 690 cdev_t dev = ap->a_head.a_dev; 691 xa_softc_t *sc; 692 int error; 693 694 dev->si_bsize_phys = 512; 695 dev->si_bsize_best = 32768; 696 697 /* 698 * Interlock open with opencnt, wait for attachment operations 699 * to finish. 700 */ 701 lockmgr(&xdisk_lk, LK_EXCLUSIVE); 702 again: 703 sc = dev->si_drv1; 704 if (sc == NULL) { 705 lockmgr(&xdisk_lk, LK_RELEASE); 706 return ENXIO; /* raced destruction */ 707 } 708 if (sc->serializing) { 709 tsleep(sc, 0, "xarace", hz / 10); 710 goto again; 711 } 712 if (sc->terminating) { 713 lockmgr(&xdisk_lk, LK_RELEASE); 714 return ENXIO; /* raced destruction */ 715 } 716 sc->serializing = 1; 717 718 /* 719 * Serialize initial open 720 */ 721 if (sc->opencnt++ > 0) { 722 sc->serializing = 0; 723 wakeup(sc); 724 lockmgr(&xdisk_lk, LK_RELEASE); 725 return(0); 726 } 727 728 /* 729 * Issue BLK_OPEN if necessary. ENXIO is returned if we have trouble. 730 */ 731 if (sc->open_tag == NULL) { 732 lockmgr(&sc->lk, LK_EXCLUSIVE); 733 xa_restart_deferred(sc); /* eats serializing */ 734 lockmgr(&sc->lk, LK_RELEASE); 735 } else { 736 sc->serializing = 0; 737 wakeup(sc); 738 } 739 lockmgr(&xdisk_lk, LK_RELEASE); 740 741 /* 742 * Wait for completion of the BLK_OPEN 743 */ 744 lockmgr(&xdisk_lk, LK_EXCLUSIVE); 745 while (sc->serializing) 746 lksleep(sc, &xdisk_lk, 0, "xaopen", hz); 747 748 error = sc->last_error; 749 if (error) { 750 KKASSERT(sc->opencnt > 0); 751 --sc->opencnt; 752 xa_terminate_check(sc); 753 sc = NULL; /* sc may be invalid now */ 754 } 755 lockmgr(&xdisk_lk, LK_RELEASE); 756 757 return (error); 758 } 759 760 static int 761 xa_close(struct dev_close_args *ap) 762 { 763 cdev_t dev = ap->a_head.a_dev; 764 xa_softc_t *sc; 765 xa_tag_t *tag; 766 767 lockmgr(&xdisk_lk, LK_EXCLUSIVE); 768 sc = dev->si_drv1; 769 if (sc == NULL) 770 return ENXIO; /* raced destruction */ 771 if (sc->terminating) { 772 lockmgr(&sc->lk, LK_RELEASE); 773 return ENXIO; /* raced destruction */ 774 } 775 lockmgr(&sc->lk, LK_EXCLUSIVE); 776 777 /* 778 * NOTE: Clearing open_tag allows a concurrent open to re-open 779 * the device and prevents autonomous completion of the tag. 780 */ 781 if (sc->opencnt == 1 && sc->open_tag) { 782 tag = sc->open_tag; 783 sc->open_tag = NULL; 784 lockmgr(&sc->lk, LK_RELEASE); 785 kdmsg_state_reply(tag->state, 0); /* close our side */ 786 xa_wait(tag); /* wait on remote */ 787 } else { 788 lockmgr(&sc->lk, LK_RELEASE); 789 } 790 KKASSERT(sc->opencnt > 0); 791 --sc->opencnt; 792 xa_terminate_check(sc); 793 lockmgr(&xdisk_lk, LK_RELEASE); 794 795 return(0); 796 } 797 798 static int 799 xa_strategy(struct dev_strategy_args *ap) 800 { 801 xa_softc_t *sc = ap->a_head.a_dev->si_drv1; 802 xa_tag_t *tag; 803 struct bio *bio = ap->a_bio; 804 805 devstat_start_transaction(&sc->stats); 806 atomic_add_int(&xa_active, 1); 807 xa_last = bio->bio_offset; 808 809 /* 810 * If no tags are available NULL is returned and the bio is 811 * placed on sc->bioq. 812 */ 813 lockmgr(&sc->lk, LK_EXCLUSIVE); 814 tag = xa_setup_cmd(sc, bio); 815 if (tag) 816 xa_start(tag, NULL, 1); 817 lockmgr(&sc->lk, LK_RELEASE); 818 819 return(0); 820 } 821 822 static int 823 xa_ioctl(struct dev_ioctl_args *ap) 824 { 825 return(ENOTTY); 826 } 827 828 static int 829 xa_size(struct dev_psize_args *ap) 830 { 831 struct xa_softc *sc; 832 833 if ((sc = ap->a_head.a_dev->si_drv1) == NULL) 834 return (ENXIO); 835 ap->a_result = sc->info.d_media_blocks; 836 return (0); 837 } 838 839 /************************************************************************ 840 * XA BLOCK PROTOCOL STATE MACHINE * 841 ************************************************************************ 842 * 843 * Implement tag/msg setup and related functions. 844 * Called with sc->lk held. 845 */ 846 static xa_tag_t * 847 xa_setup_cmd(xa_softc_t *sc, struct bio *bio) 848 { 849 xa_tag_t *tag; 850 851 /* 852 * Only get a tag if we have a valid virtual circuit to the server. 853 */ 854 if ((tag = TAILQ_FIRST(&sc->tag_freeq)) != NULL) { 855 TAILQ_REMOVE(&sc->tag_freeq, tag, entry); 856 tag->bio = bio; 857 TAILQ_INSERT_TAIL(&sc->tag_pendq, tag, entry); 858 } 859 860 /* 861 * If we can't dispatch now and this is a bio, queue it for later. 862 */ 863 if (tag == NULL && bio) { 864 TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act); 865 } 866 867 return (tag); 868 } 869 870 /* 871 * Called with sc->lk held 872 */ 873 static void 874 xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async) 875 { 876 xa_softc_t *sc = tag->sc; 877 878 tag->done = 0; 879 tag->async = async; 880 tag->status.head.error = DMSG_ERR_IO; /* fallback error */ 881 882 if (msg == NULL) { 883 struct bio *bio; 884 struct buf *bp; 885 kdmsg_state_t *trans; 886 887 if (sc->opencnt == 0 || sc->open_tag == NULL) { 888 TAILQ_FOREACH(trans, &sc->spanq, user_entry) { 889 if ((trans->rxcmd & DMSGF_DELETE) == 0) 890 break; 891 } 892 } else { 893 trans = sc->open_tag->state; 894 } 895 if (trans == NULL) 896 goto skip; 897 898 KKASSERT(tag->bio); 899 bio = tag->bio; 900 bp = bio->bio_buf; 901 902 switch(bp->b_cmd) { 903 case BUF_CMD_READ: 904 msg = kdmsg_msg_alloc(trans, 905 DMSG_BLK_READ | 906 DMSGF_CREATE | 907 DMSGF_DELETE, 908 xa_bio_completion, tag); 909 msg->any.blk_read.keyid = sc->keyid; 910 msg->any.blk_read.offset = bio->bio_offset; 911 msg->any.blk_read.bytes = bp->b_bcount; 912 break; 913 case BUF_CMD_WRITE: 914 msg = kdmsg_msg_alloc(trans, 915 DMSG_BLK_WRITE | 916 DMSGF_CREATE | DMSGF_DELETE, 917 xa_bio_completion, tag); 918 msg->any.blk_write.keyid = sc->keyid; 919 msg->any.blk_write.offset = bio->bio_offset; 920 msg->any.blk_write.bytes = bp->b_bcount; 921 msg->aux_data = bp->b_data; 922 msg->aux_size = bp->b_bcount; 923 break; 924 case BUF_CMD_FLUSH: 925 msg = kdmsg_msg_alloc(trans, 926 DMSG_BLK_FLUSH | 927 DMSGF_CREATE | DMSGF_DELETE, 928 xa_bio_completion, tag); 929 msg->any.blk_flush.keyid = sc->keyid; 930 msg->any.blk_flush.offset = bio->bio_offset; 931 msg->any.blk_flush.bytes = bp->b_bcount; 932 break; 933 case BUF_CMD_FREEBLKS: 934 msg = kdmsg_msg_alloc(trans, 935 DMSG_BLK_FREEBLKS | 936 DMSGF_CREATE | DMSGF_DELETE, 937 xa_bio_completion, tag); 938 msg->any.blk_freeblks.keyid = sc->keyid; 939 msg->any.blk_freeblks.offset = bio->bio_offset; 940 msg->any.blk_freeblks.bytes = bp->b_bcount; 941 break; 942 default: 943 bp->b_flags |= B_ERROR; 944 bp->b_error = EIO; 945 devstat_end_transaction_buf(&sc->stats, bp); 946 atomic_add_int(&xa_active, -1); 947 biodone(bio); 948 tag->bio = NULL; 949 break; 950 } 951 } 952 953 /* 954 * If no msg was allocated we likely could not find a good span. 955 */ 956 skip: 957 if (msg) { 958 /* 959 * Message was passed in or constructed. 960 */ 961 tag->state = msg->state; 962 lockmgr(&sc->lk, LK_RELEASE); 963 kdmsg_msg_write(msg); 964 lockmgr(&sc->lk, LK_EXCLUSIVE); 965 } else if (tag->bio && 966 (tag->bio->bio_buf->b_flags & B_FAILONDIS) == 0) { 967 /* 968 * No spans available but BIO is not allowed to fail 969 * on connectivity problems. Requeue the BIO. 970 */ 971 TAILQ_INSERT_TAIL(&sc->bioq, tag->bio, bio_act); 972 tag->bio = NULL; 973 lockmgr(&sc->lk, LK_RELEASE); 974 xa_done(tag, 1); 975 lockmgr(&sc->lk, LK_EXCLUSIVE); 976 } else { 977 /* 978 * No spans available, bio is allowed to fail. 979 */ 980 lockmgr(&sc->lk, LK_RELEASE); 981 tag->status.head.error = DMSG_ERR_IO; 982 xa_done(tag, 1); 983 lockmgr(&sc->lk, LK_EXCLUSIVE); 984 } 985 } 986 987 static uint32_t 988 xa_wait(xa_tag_t *tag) 989 { 990 xa_softc_t *sc = tag->sc; 991 uint32_t error; 992 993 lockmgr(&sc->lk, LK_EXCLUSIVE); 994 tag->waiting = 1; 995 while (tag->done == 0) 996 lksleep(tag, &sc->lk, 0, "xawait", 0); 997 lockmgr(&sc->lk, LK_RELEASE); 998 999 error = tag->status.head.error; 1000 tag->waiting = 0; 1001 xa_release(tag, 0); 1002 1003 return error; 1004 } 1005 1006 static void 1007 xa_done(xa_tag_t *tag, int wasbio) 1008 { 1009 KKASSERT(tag->bio == NULL); 1010 1011 tag->state = NULL; 1012 tag->done = 1; 1013 if (tag->waiting) 1014 wakeup(tag); 1015 if (tag->async) 1016 xa_release(tag, wasbio); 1017 } 1018 1019 /* 1020 * Release a tag. If everything looks ok and there are pending BIOs 1021 * (due to all tags in-use), we can use the tag to start the next BIO. 1022 * Do not try to restart if the connection is currently failed. 1023 */ 1024 static 1025 void 1026 xa_release(xa_tag_t *tag, int wasbio) 1027 { 1028 xa_softc_t *sc = tag->sc; 1029 struct bio *bio; 1030 1031 if ((bio = tag->bio) != NULL) { 1032 struct buf *bp = bio->bio_buf; 1033 1034 bp->b_error = EIO; 1035 bp->b_flags |= B_ERROR; 1036 devstat_end_transaction_buf(&sc->stats, bp); 1037 atomic_add_int(&xa_active, -1); 1038 biodone(bio); 1039 tag->bio = NULL; 1040 } 1041 1042 lockmgr(&sc->lk, LK_EXCLUSIVE); 1043 1044 if (wasbio && sc->open_tag && 1045 (bio = TAILQ_FIRST(&sc->bioq)) != NULL) { 1046 TAILQ_REMOVE(&sc->bioq, bio, bio_act); 1047 tag->bio = bio; 1048 xa_start(tag, NULL, 1); 1049 } else { 1050 TAILQ_REMOVE(&sc->tag_pendq, tag, entry); 1051 TAILQ_INSERT_TAIL(&sc->tag_freeq, tag, entry); 1052 } 1053 lockmgr(&sc->lk, LK_RELEASE); 1054 } 1055 1056 /* 1057 * Handle messages under the BLKOPEN transaction. 1058 */ 1059 static int 1060 xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg) 1061 { 1062 xa_tag_t *tag = state->any.any; 1063 xa_softc_t *sc; 1064 struct bio *bio; 1065 1066 /* 1067 * If the tag has been cleaned out we already closed our side 1068 * of the transaction and we are waiting for the other side to 1069 * close. 1070 */ 1071 xa_printf(1, "xa_sync_completion: tag %p msg %08x state %p\n", 1072 tag, msg->any.head.cmd, msg->state); 1073 1074 if (tag == NULL) { 1075 if (msg->any.head.cmd & DMSGF_CREATE) 1076 kdmsg_state_reply(state, DMSG_ERR_LOSTLINK); 1077 return 0; 1078 } 1079 sc = tag->sc; 1080 1081 /* 1082 * Validate the tag 1083 */ 1084 lockmgr(&sc->lk, LK_EXCLUSIVE); 1085 1086 /* 1087 * Handle initial response to our open and restart any deferred 1088 * BIOs on success. 1089 * 1090 * NOTE: DELETE may also be set. 1091 */ 1092 if (msg->any.head.cmd & DMSGF_CREATE) { 1093 switch(msg->any.head.cmd & DMSGF_CMDSWMASK) { 1094 case DMSG_LNK_ERROR | DMSGF_REPLY: 1095 bzero(&tag->status, sizeof(tag->status)); 1096 tag->status.head = msg->any.head; 1097 break; 1098 case DMSG_BLK_ERROR | DMSGF_REPLY: 1099 tag->status = msg->any.blk_error; 1100 break; 1101 } 1102 sc->last_error = tag->status.head.error; 1103 xa_printf(1, "blk_open completion status %d\n", 1104 sc->last_error); 1105 if (sc->last_error == 0) { 1106 while ((bio = TAILQ_FIRST(&sc->bioq)) != NULL) { 1107 tag = xa_setup_cmd(sc, NULL); 1108 if (tag == NULL) 1109 break; 1110 TAILQ_REMOVE(&sc->bioq, bio, bio_act); 1111 tag->bio = bio; 1112 xa_start(tag, NULL, 1); 1113 } 1114 } 1115 sc->serializing = 0; 1116 wakeup(sc); 1117 } 1118 1119 /* 1120 * Handle unexpected termination (or lost comm channel) from other 1121 * side. Autonomous completion only if open_tag matches, 1122 * otherwise another thread is probably waiting on the tag. 1123 * 1124 * (see xa_close() for other interactions) 1125 */ 1126 if (msg->any.head.cmd & DMSGF_DELETE) { 1127 kdmsg_state_reply(tag->state, 0); 1128 if (sc->open_tag == tag) { 1129 sc->open_tag = NULL; 1130 xa_done(tag, 0); 1131 } else { 1132 tag->async = 0; 1133 xa_done(tag, 0); 1134 } 1135 } 1136 lockmgr(&sc->lk, LK_RELEASE); 1137 1138 return (0); 1139 } 1140 1141 static int 1142 xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg) 1143 { 1144 xa_tag_t *tag = state->any.any; 1145 xa_softc_t *sc = tag->sc; 1146 struct bio *bio; 1147 struct buf *bp; 1148 1149 /* 1150 * Get the bio from the tag. If no bio is present we just do 1151 * 'done' handling. 1152 */ 1153 if ((bio = tag->bio) == NULL) 1154 goto handle_done; 1155 bp = bio->bio_buf; 1156 1157 /* 1158 * Process return status 1159 */ 1160 switch(msg->any.head.cmd & DMSGF_CMDSWMASK) { 1161 case DMSG_LNK_ERROR | DMSGF_REPLY: 1162 bzero(&tag->status, sizeof(tag->status)); 1163 tag->status.head = msg->any.head; 1164 if (tag->status.head.error) 1165 tag->status.resid = bp->b_bcount; 1166 else 1167 tag->status.resid = 0; 1168 break; 1169 case DMSG_BLK_ERROR | DMSGF_REPLY: 1170 tag->status = msg->any.blk_error; 1171 break; 1172 } 1173 1174 /* 1175 * If the device is open stall the bio on DMSG errors. If an 1176 * actual I/O error occured on the remote device, DMSG_ERR_IO 1177 * will be returned. 1178 */ 1179 if (tag->status.head.error && 1180 (msg->any.head.cmd & DMSGF_DELETE) && sc->opencnt) { 1181 if (tag->status.head.error != DMSG_ERR_IO) 1182 goto handle_repend; 1183 } 1184 1185 /* 1186 * Process bio completion 1187 * 1188 * For reads any returned data is zero-extended if necessary, so 1189 * the server can short-cut any all-zeros reads if it desires. 1190 */ 1191 switch(bp->b_cmd) { 1192 case BUF_CMD_READ: 1193 if (msg->aux_data && msg->aux_size) { 1194 if (msg->aux_size < bp->b_bcount) { 1195 bcopy(msg->aux_data, bp->b_data, msg->aux_size); 1196 bzero(bp->b_data + msg->aux_size, 1197 bp->b_bcount - msg->aux_size); 1198 } else { 1199 bcopy(msg->aux_data, bp->b_data, bp->b_bcount); 1200 } 1201 } else { 1202 bzero(bp->b_data, bp->b_bcount); 1203 } 1204 /* fall through */ 1205 case BUF_CMD_WRITE: 1206 case BUF_CMD_FLUSH: 1207 case BUF_CMD_FREEBLKS: 1208 default: 1209 if (tag->status.resid > bp->b_bcount) 1210 tag->status.resid = bp->b_bcount; 1211 bp->b_resid = tag->status.resid; 1212 if (tag->status.head.error != 0) { 1213 bp->b_error = EIO; 1214 bp->b_flags |= B_ERROR; 1215 } else { 1216 bp->b_resid = 0; 1217 } 1218 devstat_end_transaction_buf(&sc->stats, bp); 1219 atomic_add_int(&xa_active, -1); 1220 biodone(bio); 1221 tag->bio = NULL; 1222 break; 1223 } 1224 1225 /* 1226 * Handle completion of the transaction. If the bioq is not empty 1227 * we can initiate another bio on the same tag. 1228 * 1229 * NOTE: Most of our transactions will be single-message 1230 * CREATE+DELETEs, so we won't have to terminate the 1231 * transaction separately, here. But just in case they 1232 * aren't be sure to terminate the transaction. 1233 */ 1234 handle_done: 1235 if (msg->any.head.cmd & DMSGF_DELETE) { 1236 xa_done(tag, 1); 1237 if ((state->txcmd & DMSGF_DELETE) == 0) 1238 kdmsg_msg_reply(msg, 0); 1239 } 1240 return (0); 1241 1242 /* 1243 * Handle the case where the transaction failed due to a 1244 * connectivity issue. The tag is put away with wasbio=0 1245 * and we put the BIO back onto the bioq for a later restart. 1246 * 1247 * probe I/Os (where the device is not open) will be failed 1248 * instead of requeued. 1249 */ 1250 handle_repend: 1251 tag->bio = NULL; 1252 if (bio->bio_buf->b_flags & B_FAILONDIS) { 1253 xa_printf(1, "xa_strategy: lost link, fail probe bp %p\n", 1254 bio->bio_buf); 1255 bio->bio_buf->b_error = ENXIO; 1256 bio->bio_buf->b_flags |= B_ERROR; 1257 biodone(bio); 1258 bio = NULL; 1259 } else { 1260 xa_printf(1, "xa_strategy: lost link, requeue bp %p\n", 1261 bio->bio_buf); 1262 } 1263 xa_done(tag, 0); 1264 if ((state->txcmd & DMSGF_DELETE) == 0) 1265 kdmsg_msg_reply(msg, 0); 1266 1267 /* 1268 * Requeue the bio 1269 */ 1270 if (bio) { 1271 lockmgr(&sc->lk, LK_EXCLUSIVE); 1272 TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act); 1273 lockmgr(&sc->lk, LK_RELEASE); 1274 } 1275 return (0); 1276 } 1277 1278 /* 1279 * Restart as much deferred I/O as we can. The serializer is set and we 1280 * eat it (clear it) when done. 1281 * 1282 * Called with sc->lk held 1283 */ 1284 static 1285 void 1286 xa_restart_deferred(xa_softc_t *sc) 1287 { 1288 kdmsg_state_t *span; 1289 kdmsg_msg_t *msg; 1290 xa_tag_t *tag; 1291 int error; 1292 1293 KKASSERT(sc->serializing); 1294 1295 /* 1296 * Determine if a restart is needed. 1297 */ 1298 if (sc->opencnt == 0) { 1299 /* 1300 * Device is not open, nothing to do, eat serializing. 1301 */ 1302 sc->serializing = 0; 1303 wakeup(sc); 1304 } else if (sc->open_tag == NULL) { 1305 /* 1306 * BLK_OPEN required before we can restart any BIOs. 1307 * Select the best LNK_SPAN to issue the BLK_OPEN under. 1308 * 1309 * serializing interlocks waiting open()s. 1310 */ 1311 error = 0; 1312 TAILQ_FOREACH(span, &sc->spanq, user_entry) { 1313 if ((span->rxcmd & DMSGF_DELETE) == 0) 1314 break; 1315 } 1316 if (span == NULL) 1317 error = ENXIO; 1318 1319 if (error == 0) { 1320 tag = xa_setup_cmd(sc, NULL); 1321 if (tag == NULL) 1322 error = ENXIO; 1323 } 1324 if (error == 0) { 1325 sc->open_tag = tag; 1326 msg = kdmsg_msg_alloc(span, 1327 DMSG_BLK_OPEN | 1328 DMSGF_CREATE, 1329 xa_sync_completion, tag); 1330 msg->any.blk_open.modes = DMSG_BLKOPEN_RD; 1331 xa_printf(1, 1332 "BLK_OPEN tag %p state %p " 1333 "span-state %p\n", 1334 tag, msg->state, span); 1335 xa_start(tag, msg, 0); 1336 } 1337 if (error) { 1338 sc->serializing = 0; 1339 wakeup(sc); 1340 } 1341 /* else leave serializing set until BLK_OPEN response */ 1342 } else { 1343 /* nothing to do */ 1344 sc->serializing = 0; 1345 wakeup(sc); 1346 } 1347 } 1348