1 /* 2 * Copyright (c) 2011-2015 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@dragonflybsd.org> 6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org> 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in 16 * the documentation and/or other materials provided with the 17 * distribution. 18 * 3. Neither the name of The DragonFly Project nor the names of its 19 * contributors may be used to endorse or promote products derived 20 * from this software without specific, prior written permission. 21 * 22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 33 * SUCH DAMAGE. 34 */ 35 36 #include "dmsg_local.h" 37 38 #define DMSG_BLOCK_DEBUG 39 40 int DMsgDebugOpt; 41 static unsigned int dmsg_state_count; 42 #ifdef DMSG_BLOCK_DEBUG 43 static unsigned int biocount; 44 #endif 45 46 static int dmsg_state_msgrx(dmsg_msg_t *msg, int mstate); 47 static void dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg); 48 static void dmsg_msg_free_locked(dmsg_msg_t *msg); 49 static void dmsg_state_free(dmsg_state_t *state); 50 static void dmsg_subq_delete(dmsg_state_t *state); 51 static void dmsg_simulate_failure(dmsg_state_t *state, int meto, int error); 52 static void dmsg_state_abort(dmsg_state_t *state); 53 static void dmsg_state_dying(dmsg_state_t *state); 54 55 RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp); 56 57 /* 58 * STATE TREE - Represents open transactions which are indexed by their 59 * { msgid } relative to the governing iocom. 60 */ 61 int 62 dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2) 63 { 64 if (state1->msgid < state2->msgid) 65 return(-1); 66 if (state1->msgid > state2->msgid) 67 return(1); 68 return(0); 69 } 70 71 /* 72 * Initialize a low-level ioq 73 */ 74 void 75 dmsg_ioq_init(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq) 76 { 77 bzero(ioq, sizeof(*ioq)); 78 ioq->state = DMSG_MSGQ_STATE_HEADER1; 79 TAILQ_INIT(&ioq->msgq); 80 } 81 82 /* 83 * Cleanup queue. 84 * 85 * caller holds iocom->mtx. 86 */ 87 void 88 dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq) 89 { 90 dmsg_msg_t *msg; 91 92 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) { 93 assert(0); /* shouldn't happen */ 94 TAILQ_REMOVE(&ioq->msgq, msg, qentry); 95 dmsg_msg_free(msg); 96 } 97 if ((msg = ioq->msg) != NULL) { 98 ioq->msg = NULL; 99 dmsg_msg_free(msg); 100 } 101 } 102 103 /* 104 * Initialize a low-level communications channel. 105 * 106 * NOTE: The signal_func() is called at least once from the loop and can be 107 * re-armed via dmsg_iocom_restate(). 108 */ 109 void 110 dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd, 111 void (*signal_func)(dmsg_iocom_t *iocom), 112 void (*rcvmsg_func)(dmsg_msg_t *msg), 113 void (*usrmsg_func)(dmsg_msg_t *msg, int unmanaged), 114 void (*altmsg_func)(dmsg_iocom_t *iocom)) 115 { 116 struct stat st; 117 118 bzero(iocom, sizeof(*iocom)); 119 120 asprintf(&iocom->label, "iocom-%p", iocom); 121 iocom->signal_callback = signal_func; 122 iocom->rcvmsg_callback = rcvmsg_func; 123 iocom->altmsg_callback = altmsg_func; 124 iocom->usrmsg_callback = usrmsg_func; 125 126 pthread_mutex_init(&iocom->mtx, NULL); 127 RB_INIT(&iocom->staterd_tree); 128 RB_INIT(&iocom->statewr_tree); 129 TAILQ_INIT(&iocom->txmsgq); 130 iocom->sock_fd = sock_fd; 131 iocom->alt_fd = alt_fd; 132 iocom->flags = DMSG_IOCOMF_RREQ | DMSG_IOCOMF_CLOSEALT; 133 if (signal_func) 134 iocom->flags |= DMSG_IOCOMF_SWORK; 135 dmsg_ioq_init(iocom, &iocom->ioq_rx); 136 dmsg_ioq_init(iocom, &iocom->ioq_tx); 137 iocom->state0.refs = 1; /* should never trigger a free */ 138 iocom->state0.iocom = iocom; 139 iocom->state0.parent = &iocom->state0; 140 iocom->state0.flags = DMSG_STATE_ROOT; 141 TAILQ_INIT(&iocom->state0.subq); 142 143 if (pipe(iocom->wakeupfds) < 0) 144 assert(0); 145 fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK); 146 fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK); 147 148 /* 149 * Negotiate session crypto synchronously. This will mark the 150 * connection as error'd if it fails. If this is a pipe it's 151 * a linkage that we set up ourselves to the filesystem and there 152 * is no crypto. 153 */ 154 if (fstat(sock_fd, &st) < 0) 155 assert(0); 156 if (S_ISSOCK(st.st_mode)) 157 dmsg_crypto_negotiate(iocom); 158 159 /* 160 * Make sure our fds are set to non-blocking for the iocom core. 161 */ 162 if (sock_fd >= 0) 163 fcntl(sock_fd, F_SETFL, O_NONBLOCK); 164 #if 0 165 /* if line buffered our single fgets() should be fine */ 166 if (alt_fd >= 0) 167 fcntl(alt_fd, F_SETFL, O_NONBLOCK); 168 #endif 169 } 170 171 void 172 dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...) 173 { 174 va_list va; 175 char *optr; 176 177 va_start(va, ctl); 178 optr = iocom->label; 179 vasprintf(&iocom->label, ctl, va); 180 va_end(va); 181 if (optr) 182 free(optr); 183 } 184 185 /* 186 * May only be called from a callback from iocom_core. 187 * 188 * Adjust state machine functions, set flags to guarantee that both 189 * the recevmsg_func and the sendmsg_func is called at least once. 190 */ 191 void 192 dmsg_iocom_restate(dmsg_iocom_t *iocom, 193 void (*signal_func)(dmsg_iocom_t *), 194 void (*rcvmsg_func)(dmsg_msg_t *msg)) 195 { 196 pthread_mutex_lock(&iocom->mtx); 197 iocom->signal_callback = signal_func; 198 iocom->rcvmsg_callback = rcvmsg_func; 199 if (signal_func) 200 atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK); 201 else 202 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK); 203 pthread_mutex_unlock(&iocom->mtx); 204 } 205 206 void 207 dmsg_iocom_signal(dmsg_iocom_t *iocom) 208 { 209 pthread_mutex_lock(&iocom->mtx); 210 if (iocom->signal_callback) 211 atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK); 212 pthread_mutex_unlock(&iocom->mtx); 213 } 214 215 /* 216 * Cleanup a terminating iocom. 217 * 218 * Caller should not hold iocom->mtx. The iocom has already been disconnected 219 * from all possible references to it. 220 */ 221 void 222 dmsg_iocom_done(dmsg_iocom_t *iocom) 223 { 224 dmsg_crypto_terminate(iocom); 225 if (iocom->sock_fd >= 0) { 226 close(iocom->sock_fd); 227 iocom->sock_fd = -1; 228 } 229 if (iocom->alt_fd >= 0 && (iocom->flags & DMSG_IOCOMF_CLOSEALT)) { 230 close(iocom->alt_fd); 231 iocom->alt_fd = -1; 232 } 233 dmsg_ioq_done(iocom, &iocom->ioq_rx); 234 dmsg_ioq_done(iocom, &iocom->ioq_tx); 235 if (iocom->wakeupfds[0] >= 0) { 236 close(iocom->wakeupfds[0]); 237 iocom->wakeupfds[0] = -1; 238 } 239 if (iocom->wakeupfds[1] >= 0) { 240 close(iocom->wakeupfds[1]); 241 iocom->wakeupfds[1] = -1; 242 } 243 pthread_mutex_destroy(&iocom->mtx); 244 } 245 246 /* 247 * Allocate a new message using the specified transaction state. 248 * 249 * If CREATE is set a new transaction is allocated relative to the passed-in 250 * transaction (the 'state' argument becomes pstate). 251 * 252 * If CREATE is not set the message is associated with the passed-in 253 * transaction. 254 */ 255 dmsg_msg_t * 256 dmsg_msg_alloc(dmsg_state_t *state, 257 size_t aux_size, uint32_t cmd, 258 void (*func)(dmsg_msg_t *), void *data) 259 { 260 dmsg_iocom_t *iocom = state->iocom; 261 dmsg_msg_t *msg; 262 263 pthread_mutex_lock(&iocom->mtx); 264 msg = dmsg_msg_alloc_locked(state, aux_size, cmd, func, data); 265 pthread_mutex_unlock(&iocom->mtx); 266 267 return msg; 268 } 269 270 dmsg_msg_t * 271 dmsg_msg_alloc_locked(dmsg_state_t *state, 272 size_t aux_size, uint32_t cmd, 273 void (*func)(dmsg_msg_t *), void *data) 274 { 275 dmsg_iocom_t *iocom = state->iocom; 276 dmsg_state_t *pstate; 277 dmsg_msg_t *msg; 278 int hbytes; 279 size_t aligned_size; 280 281 aligned_size = DMSG_DOALIGN(aux_size); 282 if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) { 283 /* 284 * When CREATE is set without REPLY the caller is 285 * initiating a new transaction stacked under the specified 286 * circuit. 287 * 288 * It is possible to race a circuit failure, inherit the 289 * parent's STATE_DYING flag to trigger an abort sequence 290 * in the transmit path. By not inheriting ABORTING the 291 * abort sequence can recurse. 292 * 293 * NOTE: CREATE in txcmd handled by dmsg_msg_write() 294 * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx() 295 */ 296 pstate = state; 297 state = malloc(sizeof(*state)); 298 bzero(state, sizeof(*state)); 299 atomic_add_int(&dmsg_state_count, 1); 300 301 TAILQ_INIT(&state->subq); 302 state->parent = pstate; 303 state->iocom = iocom; 304 state->flags = DMSG_STATE_DYNAMIC; 305 state->msgid = (uint64_t)(uintptr_t)state; 306 state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE); 307 state->rxcmd = DMSGF_REPLY; 308 state->icmd = state->txcmd & DMSGF_BASECMDMASK; 309 state->func = func; 310 state->any.any = data; 311 312 state->flags |= DMSG_STATE_SUBINSERTED | 313 DMSG_STATE_RBINSERTED; 314 state->flags |= pstate->flags & DMSG_STATE_DYING; 315 if (TAILQ_EMPTY(&pstate->subq)) 316 dmsg_state_hold(pstate); 317 RB_INSERT(dmsg_state_tree, &iocom->statewr_tree, state); 318 TAILQ_INSERT_TAIL(&pstate->subq, state, entry); 319 dmsg_state_hold(state); /* state on pstate->subq */ 320 dmsg_state_hold(state); /* state on rbtree */ 321 dmsg_state_hold(state); /* msg->state */ 322 } else { 323 /* 324 * Otherwise the message is transmitted over the existing 325 * open transaction. 326 */ 327 pstate = state->parent; 328 dmsg_state_hold(state); /* msg->state */ 329 } 330 331 /* XXX SMP race for state */ 332 hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN; 333 assert((size_t)hbytes >= sizeof(struct dmsg_hdr)); 334 msg = malloc(offsetof(struct dmsg_msg, any.head) + hbytes); 335 bzero(msg, offsetof(struct dmsg_msg, any.head)); 336 337 /* 338 * [re]allocate the auxillary data buffer. The caller knows that 339 * a size-aligned buffer will be allocated but we do not want to 340 * force the caller to zero any tail piece, so we do that ourself. 341 */ 342 if (msg->aux_size != aux_size) { 343 if (msg->aux_data) { 344 free(msg->aux_data); 345 msg->aux_data = NULL; 346 msg->aux_size = 0; 347 } 348 if (aux_size) { 349 msg->aux_data = malloc(aligned_size); 350 msg->aux_size = aux_size; 351 if (aux_size != aligned_size) { 352 bzero(msg->aux_data + aux_size, 353 aligned_size - aux_size); 354 } 355 } 356 } 357 358 /* 359 * Set REVTRANS if the transaction was remotely initiated 360 * Set REVCIRC if the circuit was remotely initiated 361 */ 362 if (state->flags & DMSG_STATE_OPPOSITE) 363 cmd |= DMSGF_REVTRANS; 364 if (pstate->flags & DMSG_STATE_OPPOSITE) 365 cmd |= DMSGF_REVCIRC; 366 367 /* 368 * Finish filling out the header. 369 */ 370 bzero(&msg->any.head, hbytes); 371 msg->hdr_size = hbytes; 372 msg->any.head.magic = DMSG_HDR_MAGIC; 373 msg->any.head.cmd = cmd; 374 msg->any.head.aux_descr = 0; 375 msg->any.head.aux_crc = 0; 376 msg->any.head.msgid = state->msgid; 377 msg->any.head.circuit = pstate->msgid; 378 msg->state = state; 379 380 return (msg); 381 } 382 383 /* 384 * Free a message so it can be reused afresh. 385 * 386 * NOTE: aux_size can be 0 with a non-NULL aux_data. 387 */ 388 static 389 void 390 dmsg_msg_free_locked(dmsg_msg_t *msg) 391 { 392 dmsg_state_t *state; 393 394 if ((state = msg->state) != NULL) { 395 dmsg_state_drop(state); 396 msg->state = NULL; /* safety */ 397 } 398 if (msg->aux_data) { 399 free(msg->aux_data); 400 msg->aux_data = NULL; /* safety */ 401 } 402 msg->aux_size = 0; 403 free (msg); 404 } 405 406 void 407 dmsg_msg_free(dmsg_msg_t *msg) 408 { 409 dmsg_iocom_t *iocom = msg->state->iocom; 410 411 pthread_mutex_lock(&iocom->mtx); 412 dmsg_msg_free_locked(msg); 413 pthread_mutex_unlock(&iocom->mtx); 414 } 415 416 /* 417 * I/O core loop for an iocom. 418 * 419 * Thread localized, iocom->mtx not held. 420 */ 421 void 422 dmsg_iocom_core(dmsg_iocom_t *iocom) 423 { 424 struct pollfd fds[3]; 425 char dummybuf[256]; 426 dmsg_msg_t *msg; 427 int timeout; 428 int count; 429 int wi; /* wakeup pipe */ 430 int si; /* socket */ 431 int ai; /* alt bulk path socket */ 432 433 while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) { 434 /* 435 * These iocom->flags are only manipulated within the 436 * context of the current thread. However, modifications 437 * still require atomic ops. 438 */ 439 dmio_printf(iocom, 5, "iocom %p %08x\n", 440 iocom, iocom->flags); 441 if ((iocom->flags & (DMSG_IOCOMF_RWORK | 442 DMSG_IOCOMF_WWORK | 443 DMSG_IOCOMF_PWORK | 444 DMSG_IOCOMF_SWORK | 445 DMSG_IOCOMF_ARWORK | 446 DMSG_IOCOMF_AWWORK)) == 0) { 447 /* 448 * Only poll if no immediate work is pending. 449 * Otherwise we are just wasting our time calling 450 * poll. 451 */ 452 timeout = 5000; 453 454 count = 0; 455 wi = -1; 456 si = -1; 457 ai = -1; 458 459 /* 460 * Always check the inter-thread pipe, e.g. 461 * for iocom->txmsgq work. 462 */ 463 wi = count++; 464 fds[wi].fd = iocom->wakeupfds[0]; 465 fds[wi].events = POLLIN; 466 fds[wi].revents = 0; 467 468 /* 469 * Check the socket input/output direction as 470 * requested 471 */ 472 if (iocom->flags & (DMSG_IOCOMF_RREQ | 473 DMSG_IOCOMF_WREQ)) { 474 si = count++; 475 fds[si].fd = iocom->sock_fd; 476 fds[si].events = 0; 477 fds[si].revents = 0; 478 479 if (iocom->flags & DMSG_IOCOMF_RREQ) 480 fds[si].events |= POLLIN; 481 if (iocom->flags & DMSG_IOCOMF_WREQ) 482 fds[si].events |= POLLOUT; 483 } 484 485 /* 486 * Check the alternative fd for work. 487 */ 488 if (iocom->alt_fd >= 0) { 489 ai = count++; 490 fds[ai].fd = iocom->alt_fd; 491 fds[ai].events = POLLIN; 492 fds[ai].revents = 0; 493 } 494 poll(fds, count, timeout); 495 496 if (wi >= 0 && (fds[wi].revents & POLLIN)) 497 atomic_set_int(&iocom->flags, 498 DMSG_IOCOMF_PWORK); 499 if (si >= 0 && (fds[si].revents & POLLIN)) 500 atomic_set_int(&iocom->flags, 501 DMSG_IOCOMF_RWORK); 502 if (si >= 0 && (fds[si].revents & POLLOUT)) 503 atomic_set_int(&iocom->flags, 504 DMSG_IOCOMF_WWORK); 505 if (wi >= 0 && (fds[wi].revents & POLLOUT)) 506 atomic_set_int(&iocom->flags, 507 DMSG_IOCOMF_WWORK); 508 if (ai >= 0 && (fds[ai].revents & POLLIN)) 509 atomic_set_int(&iocom->flags, 510 DMSG_IOCOMF_ARWORK); 511 } else { 512 /* 513 * Always check the pipe 514 */ 515 atomic_set_int(&iocom->flags, DMSG_IOCOMF_PWORK); 516 } 517 518 if (iocom->flags & DMSG_IOCOMF_SWORK) { 519 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK); 520 iocom->signal_callback(iocom); 521 } 522 523 /* 524 * Pending message queues from other threads wake us up 525 * with a write to the wakeupfds[] pipe. We have to clear 526 * the pipe with a dummy read. 527 */ 528 if (iocom->flags & DMSG_IOCOMF_PWORK) { 529 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_PWORK); 530 read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf)); 531 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK); 532 atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK); 533 } 534 535 /* 536 * Message write sequencing 537 */ 538 if (iocom->flags & DMSG_IOCOMF_WWORK) 539 dmsg_iocom_flush1(iocom); 540 541 /* 542 * Message read sequencing. Run this after the write 543 * sequencing in case the write sequencing allowed another 544 * auto-DELETE to occur on the read side. 545 */ 546 if (iocom->flags & DMSG_IOCOMF_RWORK) { 547 while ((iocom->flags & DMSG_IOCOMF_EOF) == 0 && 548 (msg = dmsg_ioq_read(iocom)) != NULL) { 549 dmio_printf(iocom, 4, "receive %s\n", 550 dmsg_msg_str(msg)); 551 iocom->rcvmsg_callback(msg); 552 pthread_mutex_lock(&iocom->mtx); 553 dmsg_state_cleanuprx(iocom, msg); 554 pthread_mutex_unlock(&iocom->mtx); 555 } 556 } 557 558 if (iocom->flags & DMSG_IOCOMF_ARWORK) { 559 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_ARWORK); 560 iocom->altmsg_callback(iocom); 561 } 562 } 563 } 564 565 /* 566 * Make sure there's enough room in the FIFO to hold the 567 * needed data. 568 * 569 * Assume worst case encrypted form is 2x the size of the 570 * plaintext equivalent. 571 */ 572 static 573 size_t 574 dmsg_ioq_makeroom(dmsg_ioq_t *ioq, size_t needed) 575 { 576 size_t bytes; 577 size_t nmax; 578 579 bytes = ioq->fifo_cdx - ioq->fifo_beg; 580 nmax = sizeof(ioq->buf) - ioq->fifo_end; 581 if (bytes + nmax / 2 < needed) { 582 if (bytes) { 583 bcopy(ioq->buf + ioq->fifo_beg, 584 ioq->buf, 585 bytes); 586 } 587 ioq->fifo_cdx -= ioq->fifo_beg; 588 ioq->fifo_beg = 0; 589 if (ioq->fifo_cdn < ioq->fifo_end) { 590 bcopy(ioq->buf + ioq->fifo_cdn, 591 ioq->buf + ioq->fifo_cdx, 592 ioq->fifo_end - ioq->fifo_cdn); 593 } 594 ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx; 595 ioq->fifo_cdn = ioq->fifo_cdx; 596 nmax = sizeof(ioq->buf) - ioq->fifo_end; 597 } 598 return(nmax); 599 } 600 601 /* 602 * Read the next ready message from the ioq, issuing I/O if needed. 603 * Caller should retry on a read-event when NULL is returned. 604 * 605 * If an error occurs during reception a DMSG_LNK_ERROR msg will 606 * be returned for each open transaction, then the ioq and iocom 607 * will be errored out and a non-transactional DMSG_LNK_ERROR 608 * msg will be returned as the final message. The caller should not call 609 * us again after the final message is returned. 610 * 611 * Thread localized, iocom->mtx not held. 612 */ 613 dmsg_msg_t * 614 dmsg_ioq_read(dmsg_iocom_t *iocom) 615 { 616 dmsg_ioq_t *ioq = &iocom->ioq_rx; 617 dmsg_msg_t *msg; 618 dmsg_hdr_t *head; 619 ssize_t n; 620 size_t bytes; 621 size_t nmax; 622 uint32_t aux_size; 623 uint32_t xcrc32; 624 int error; 625 626 again: 627 /* 628 * If a message is already pending we can just remove and 629 * return it. Message state has already been processed. 630 * (currently not implemented) 631 */ 632 if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) { 633 TAILQ_REMOVE(&ioq->msgq, msg, qentry); 634 635 if (msg->state == &iocom->state0) { 636 atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF); 637 dmio_printf(iocom, 1, 638 "EOF ON SOCKET %d\n", 639 iocom->sock_fd); 640 } 641 return (msg); 642 } 643 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK); 644 645 /* 646 * If the stream is errored out we stop processing it. 647 */ 648 if (ioq->error) 649 goto skip; 650 651 /* 652 * Message read in-progress (msg is NULL at the moment). We don't 653 * allocate a msg until we have its core header. 654 */ 655 nmax = sizeof(ioq->buf) - ioq->fifo_end; 656 bytes = ioq->fifo_cdx - ioq->fifo_beg; /* already decrypted */ 657 msg = ioq->msg; 658 659 switch(ioq->state) { 660 case DMSG_MSGQ_STATE_HEADER1: 661 /* 662 * Load the primary header, fail on any non-trivial read 663 * error or on EOF. Since the primary header is the same 664 * size is the message alignment it will never straddle 665 * the end of the buffer. 666 */ 667 nmax = dmsg_ioq_makeroom(ioq, sizeof(msg->any.head)); 668 if (bytes < sizeof(msg->any.head)) { 669 n = read(iocom->sock_fd, 670 ioq->buf + ioq->fifo_end, 671 nmax); 672 if (n <= 0) { 673 if (n == 0) { 674 ioq->error = DMSG_IOQ_ERROR_EOF; 675 break; 676 } 677 if (errno != EINTR && 678 errno != EINPROGRESS && 679 errno != EAGAIN) { 680 ioq->error = DMSG_IOQ_ERROR_SOCK; 681 break; 682 } 683 n = 0; 684 /* fall through */ 685 } 686 ioq->fifo_end += (size_t)n; 687 nmax -= (size_t)n; 688 } 689 690 /* 691 * Decrypt data received so far. Data will be decrypted 692 * in-place but might create gaps in the FIFO. Partial 693 * blocks are not immediately decrypted. 694 * 695 * WARNING! The header might be in the wrong endian, we 696 * do not fix it up until we get the entire 697 * extended header. 698 */ 699 if (iocom->flags & DMSG_IOCOMF_CRYPTED) { 700 dmsg_crypto_decrypt(iocom, ioq); 701 } else { 702 ioq->fifo_cdx = ioq->fifo_end; 703 ioq->fifo_cdn = ioq->fifo_end; 704 } 705 bytes = ioq->fifo_cdx - ioq->fifo_beg; 706 707 /* 708 * Insufficient data accumulated (msg is NULL, caller will 709 * retry on event). 710 */ 711 assert(msg == NULL); 712 if (bytes < sizeof(msg->any.head)) 713 break; 714 715 /* 716 * Check and fixup the core header. Note that the icrc 717 * has to be calculated before any fixups, but the crc 718 * fields in the msg may have to be swapped like everything 719 * else. 720 */ 721 head = (void *)(ioq->buf + ioq->fifo_beg); 722 if (head->magic != DMSG_HDR_MAGIC && 723 head->magic != DMSG_HDR_MAGIC_REV) { 724 dmio_printf(iocom, 1, 725 "%s: head->magic is bad %02x\n", 726 iocom->label, head->magic); 727 if (iocom->flags & DMSG_IOCOMF_CRYPTED) 728 dmio_printf(iocom, 1, "%s\n", 729 "(on encrypted link)"); 730 ioq->error = DMSG_IOQ_ERROR_SYNC; 731 break; 732 } 733 734 /* 735 * Calculate the full header size and aux data size 736 */ 737 if (head->magic == DMSG_HDR_MAGIC_REV) { 738 ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) * 739 DMSG_ALIGN; 740 aux_size = bswap32(head->aux_bytes); 741 } else { 742 ioq->hbytes = (head->cmd & DMSGF_SIZE) * 743 DMSG_ALIGN; 744 aux_size = head->aux_bytes; 745 } 746 ioq->abytes = DMSG_DOALIGN(aux_size); 747 ioq->unaligned_aux_size = aux_size; 748 if (ioq->hbytes < sizeof(msg->any.head) || 749 ioq->hbytes > sizeof(msg->any) || 750 ioq->abytes > DMSG_AUX_MAX) { 751 ioq->error = DMSG_IOQ_ERROR_FIELD; 752 break; 753 } 754 755 /* 756 * Allocate the message, the next state will fill it in. 757 * 758 * NOTE: The aux_data buffer will be sized to an aligned 759 * value and the aligned remainder zero'd for 760 * convenience. 761 * 762 * NOTE: Supply dummy state and a degenerate cmd without 763 * CREATE set. The message will temporarily be 764 * associated with state0 until later post-processing. 765 */ 766 msg = dmsg_msg_alloc(&iocom->state0, aux_size, 767 ioq->hbytes / DMSG_ALIGN, 768 NULL, NULL); 769 ioq->msg = msg; 770 771 /* 772 * Fall through to the next state. Make sure that the 773 * extended header does not straddle the end of the buffer. 774 * We still want to issue larger reads into our buffer, 775 * book-keeping is easier if we don't bcopy() yet. 776 * 777 * Make sure there is enough room for bloated encrypt data. 778 */ 779 nmax = dmsg_ioq_makeroom(ioq, ioq->hbytes); 780 ioq->state = DMSG_MSGQ_STATE_HEADER2; 781 /* fall through */ 782 case DMSG_MSGQ_STATE_HEADER2: 783 /* 784 * Fill out the extended header. 785 */ 786 assert(msg != NULL); 787 if (bytes < ioq->hbytes) { 788 assert(nmax > 0); 789 n = read(iocom->sock_fd, 790 ioq->buf + ioq->fifo_end, 791 nmax); 792 if (n <= 0) { 793 if (n == 0) { 794 ioq->error = DMSG_IOQ_ERROR_EOF; 795 break; 796 } 797 if (errno != EINTR && 798 errno != EINPROGRESS && 799 errno != EAGAIN) { 800 ioq->error = DMSG_IOQ_ERROR_SOCK; 801 break; 802 } 803 n = 0; 804 /* fall through */ 805 } 806 ioq->fifo_end += (size_t)n; 807 nmax -= (size_t)n; 808 } 809 810 if (iocom->flags & DMSG_IOCOMF_CRYPTED) { 811 dmsg_crypto_decrypt(iocom, ioq); 812 } else { 813 ioq->fifo_cdx = ioq->fifo_end; 814 ioq->fifo_cdn = ioq->fifo_end; 815 } 816 bytes = ioq->fifo_cdx - ioq->fifo_beg; 817 818 /* 819 * Insufficient data accumulated (set msg NULL so caller will 820 * retry on event). 821 */ 822 if (bytes < ioq->hbytes) { 823 msg = NULL; 824 break; 825 } 826 827 /* 828 * Calculate the extended header, decrypt data received 829 * so far. Handle endian-conversion for the entire extended 830 * header. 831 */ 832 head = (void *)(ioq->buf + ioq->fifo_beg); 833 834 /* 835 * Check the CRC. 836 */ 837 if (head->magic == DMSG_HDR_MAGIC_REV) 838 xcrc32 = bswap32(head->hdr_crc); 839 else 840 xcrc32 = head->hdr_crc; 841 head->hdr_crc = 0; 842 if (dmsg_icrc32(head, ioq->hbytes) != xcrc32) { 843 ioq->error = DMSG_IOQ_ERROR_XCRC; 844 dmio_printf(iocom, 1, "BAD-XCRC(%08x,%08x) %s\n", 845 xcrc32, dmsg_icrc32(head, ioq->hbytes), 846 dmsg_msg_str(msg)); 847 assert(0); 848 break; 849 } 850 head->hdr_crc = xcrc32; 851 852 if (head->magic == DMSG_HDR_MAGIC_REV) { 853 dmsg_bswap_head(head); 854 } 855 856 /* 857 * Copy the extended header into the msg and adjust the 858 * FIFO. 859 */ 860 bcopy(head, &msg->any, ioq->hbytes); 861 862 /* 863 * We are either done or we fall-through. 864 */ 865 if (ioq->abytes == 0) { 866 ioq->fifo_beg += ioq->hbytes; 867 break; 868 } 869 870 /* 871 * Must adjust bytes (and the state) when falling through. 872 * nmax doesn't change. 873 */ 874 ioq->fifo_beg += ioq->hbytes; 875 bytes -= ioq->hbytes; 876 ioq->state = DMSG_MSGQ_STATE_AUXDATA1; 877 /* fall through */ 878 case DMSG_MSGQ_STATE_AUXDATA1: 879 /* 880 * Copy the partial or complete [decrypted] payload from 881 * remaining bytes in the FIFO in order to optimize the 882 * makeroom call in the AUXDATA2 state. We have to 883 * fall-through either way so we can check the crc. 884 * 885 * msg->aux_size tracks our aux data. 886 * 887 * (Lets not complicate matters if the data is encrypted, 888 * since the data in-stream is not the same size as the 889 * data decrypted). 890 */ 891 if (bytes >= ioq->abytes) { 892 bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data, 893 ioq->abytes); 894 msg->aux_size = ioq->abytes; 895 ioq->fifo_beg += ioq->abytes; 896 assert(ioq->fifo_beg <= ioq->fifo_cdx); 897 assert(ioq->fifo_cdx <= ioq->fifo_cdn); 898 bytes -= ioq->abytes; 899 } else if (bytes) { 900 bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data, 901 bytes); 902 msg->aux_size = bytes; 903 ioq->fifo_beg += bytes; 904 if (ioq->fifo_cdx < ioq->fifo_beg) 905 ioq->fifo_cdx = ioq->fifo_beg; 906 assert(ioq->fifo_beg <= ioq->fifo_cdx); 907 assert(ioq->fifo_cdx <= ioq->fifo_cdn); 908 bytes = 0; 909 } else { 910 msg->aux_size = 0; 911 } 912 ioq->state = DMSG_MSGQ_STATE_AUXDATA2; 913 /* fall through */ 914 case DMSG_MSGQ_STATE_AUXDATA2: 915 /* 916 * Make sure there is enough room for more data. 917 */ 918 assert(msg); 919 nmax = dmsg_ioq_makeroom(ioq, ioq->abytes - msg->aux_size); 920 921 /* 922 * Read and decrypt more of the payload. 923 */ 924 if (msg->aux_size < ioq->abytes) { 925 assert(nmax > 0); 926 assert(bytes == 0); 927 n = read(iocom->sock_fd, 928 ioq->buf + ioq->fifo_end, 929 nmax); 930 if (n <= 0) { 931 if (n == 0) { 932 ioq->error = DMSG_IOQ_ERROR_EOF; 933 break; 934 } 935 if (errno != EINTR && 936 errno != EINPROGRESS && 937 errno != EAGAIN) { 938 ioq->error = DMSG_IOQ_ERROR_SOCK; 939 break; 940 } 941 n = 0; 942 /* fall through */ 943 } 944 ioq->fifo_end += (size_t)n; 945 nmax -= (size_t)n; 946 } 947 948 if (iocom->flags & DMSG_IOCOMF_CRYPTED) { 949 dmsg_crypto_decrypt(iocom, ioq); 950 } else { 951 ioq->fifo_cdx = ioq->fifo_end; 952 ioq->fifo_cdn = ioq->fifo_end; 953 } 954 bytes = ioq->fifo_cdx - ioq->fifo_beg; 955 956 if (bytes > ioq->abytes - msg->aux_size) 957 bytes = ioq->abytes - msg->aux_size; 958 959 if (bytes) { 960 bcopy(ioq->buf + ioq->fifo_beg, 961 msg->aux_data + msg->aux_size, 962 bytes); 963 msg->aux_size += bytes; 964 ioq->fifo_beg += bytes; 965 } 966 967 /* 968 * Insufficient data accumulated (set msg NULL so caller will 969 * retry on event). 970 * 971 * Assert the auxillary data size is correct, then record the 972 * original unaligned size from the message header. 973 */ 974 if (msg->aux_size < ioq->abytes) { 975 msg = NULL; 976 break; 977 } 978 assert(msg->aux_size == ioq->abytes); 979 msg->aux_size = ioq->unaligned_aux_size; 980 981 /* 982 * Check aux_crc, then we are done. Note that the crc 983 * is calculated over the aligned size, not the actual 984 * size. 985 */ 986 xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes); 987 if (xcrc32 != msg->any.head.aux_crc) { 988 ioq->error = DMSG_IOQ_ERROR_ACRC; 989 dmio_printf(iocom, 1, 990 "iocom: ACRC error %08x vs %08x " 991 "msgid %016jx msgcmd %08x auxsize %d\n", 992 xcrc32, 993 msg->any.head.aux_crc, 994 (intmax_t)msg->any.head.msgid, 995 msg->any.head.cmd, 996 msg->any.head.aux_bytes); 997 break; 998 } 999 break; 1000 case DMSG_MSGQ_STATE_ERROR: 1001 /* 1002 * Continued calls to drain recorded transactions (returning 1003 * a LNK_ERROR for each one), before we return the final 1004 * LNK_ERROR. 1005 */ 1006 assert(msg == NULL); 1007 break; 1008 default: 1009 /* 1010 * We don't double-return errors, the caller should not 1011 * have called us again after getting an error msg. 1012 */ 1013 assert(0); 1014 break; 1015 } 1016 1017 /* 1018 * Check the message sequence. The iv[] should prevent any 1019 * possibility of a replay but we add this check anyway. 1020 */ 1021 if (msg && ioq->error == 0) { 1022 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) { 1023 ioq->error = DMSG_IOQ_ERROR_MSGSEQ; 1024 } else { 1025 ++ioq->seq; 1026 } 1027 } 1028 1029 /* 1030 * Handle error, RREQ, or completion 1031 * 1032 * NOTE: nmax and bytes are invalid at this point, we don't bother 1033 * to update them when breaking out. 1034 */ 1035 if (ioq->error) { 1036 skip: 1037 /* 1038 * An unrecoverable error causes all active receive 1039 * transactions to be terminated with a LNK_ERROR message. 1040 * 1041 * Once all active transactions are exhausted we set the 1042 * iocom ERROR flag and return a non-transactional LNK_ERROR 1043 * message, which should cause master processing loops to 1044 * terminate. 1045 */ 1046 dmio_printf(iocom, 1, "IOQ ERROR %d\n", ioq->error); 1047 assert(ioq->msg == msg); 1048 if (msg) { 1049 dmsg_msg_free(msg); 1050 ioq->msg = NULL; 1051 msg = NULL; 1052 } 1053 1054 /* 1055 * No more I/O read processing 1056 */ 1057 ioq->state = DMSG_MSGQ_STATE_ERROR; 1058 1059 /* 1060 * Simulate a remote LNK_ERROR DELETE msg for any open 1061 * transactions, ending with a final non-transactional 1062 * LNK_ERROR (that the session can detect) when no 1063 * transactions remain. 1064 * 1065 * NOTE: Temporarily supply state0 and a degenerate cmd 1066 * without CREATE set. The real state will be 1067 * assigned in the loop. 1068 * 1069 * NOTE: We are simulating a received message using our 1070 * side of the state, so the DMSGF_REV* bits have 1071 * to be reversed. 1072 */ 1073 pthread_mutex_lock(&iocom->mtx); 1074 dmsg_iocom_drain(iocom); 1075 dmsg_simulate_failure(&iocom->state0, 0, ioq->error); 1076 pthread_mutex_unlock(&iocom->mtx); 1077 if (TAILQ_FIRST(&ioq->msgq)) 1078 goto again; 1079 1080 #if 0 1081 /* 1082 * For the iocom error case we want to set RWORK to indicate 1083 * that more messages might be pending. 1084 * 1085 * It is possible to return NULL when there is more work to 1086 * do because each message has to be DELETEd in both 1087 * directions before we continue on with the next (though 1088 * this could be optimized). The transmit direction will 1089 * re-set RWORK. 1090 */ 1091 if (msg) 1092 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK); 1093 #endif 1094 } else if (msg == NULL) { 1095 /* 1096 * Insufficient data received to finish building the message, 1097 * set RREQ and return NULL. 1098 * 1099 * Leave ioq->msg intact. 1100 * Leave the FIFO intact. 1101 */ 1102 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ); 1103 } else { 1104 /* 1105 * Continue processing msg. 1106 * 1107 * The fifo has already been advanced past the message. 1108 * Trivially reset the FIFO indices if possible. 1109 * 1110 * clear the FIFO if it is now empty and set RREQ to wait 1111 * for more from the socket. If the FIFO is not empty set 1112 * TWORK to bypass the poll so we loop immediately. 1113 */ 1114 if (ioq->fifo_beg == ioq->fifo_cdx && 1115 ioq->fifo_cdn == ioq->fifo_end) { 1116 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ); 1117 ioq->fifo_cdx = 0; 1118 ioq->fifo_cdn = 0; 1119 ioq->fifo_beg = 0; 1120 ioq->fifo_end = 0; 1121 } else { 1122 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK); 1123 } 1124 ioq->state = DMSG_MSGQ_STATE_HEADER1; 1125 ioq->msg = NULL; 1126 1127 /* 1128 * Handle message routing. Validates non-zero sources 1129 * and routes message. Error will be 0 if the message is 1130 * destined for us. 1131 * 1132 * State processing only occurs for messages destined for us. 1133 */ 1134 dmio_printf(iocom, 5, 1135 "rxmsg cmd=%08x circ=%016jx\n", 1136 msg->any.head.cmd, 1137 (intmax_t)msg->any.head.circuit); 1138 1139 error = dmsg_state_msgrx(msg, 0); 1140 1141 if (error) { 1142 /* 1143 * Abort-after-closure, throw message away and 1144 * start reading another. 1145 */ 1146 if (error == DMSG_IOQ_ERROR_EALREADY) { 1147 dmsg_msg_free(msg); 1148 goto again; 1149 } 1150 1151 /* 1152 * Process real error and throw away message. 1153 */ 1154 ioq->error = error; 1155 goto skip; 1156 } 1157 1158 /* 1159 * No error and not routed 1160 */ 1161 /* no error, not routed. Fall through and return msg */ 1162 } 1163 return (msg); 1164 } 1165 1166 /* 1167 * Calculate the header and data crc's and write a low-level message to 1168 * the connection. If aux_crc is non-zero the aux_data crc is already 1169 * assumed to have been set. 1170 * 1171 * A non-NULL msg is added to the queue but not necessarily flushed. 1172 * Calling this function with msg == NULL will get a flush going. 1173 * 1174 * (called from iocom_core only) 1175 */ 1176 void 1177 dmsg_iocom_flush1(dmsg_iocom_t *iocom) 1178 { 1179 dmsg_ioq_t *ioq = &iocom->ioq_tx; 1180 dmsg_msg_t *msg; 1181 uint32_t xcrc32; 1182 size_t hbytes; 1183 size_t abytes; 1184 dmsg_msg_queue_t tmpq; 1185 1186 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK); 1187 TAILQ_INIT(&tmpq); 1188 pthread_mutex_lock(&iocom->mtx); 1189 while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) { 1190 TAILQ_REMOVE(&iocom->txmsgq, msg, qentry); 1191 TAILQ_INSERT_TAIL(&tmpq, msg, qentry); 1192 } 1193 pthread_mutex_unlock(&iocom->mtx); 1194 1195 /* 1196 * Flush queue, doing all required encryption and CRC generation, 1197 * with the mutex unlocked. 1198 */ 1199 while ((msg = TAILQ_FIRST(&tmpq)) != NULL) { 1200 /* 1201 * Process terminal connection errors. 1202 */ 1203 TAILQ_REMOVE(&tmpq, msg, qentry); 1204 if (ioq->error) { 1205 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry); 1206 ++ioq->msgcount; 1207 continue; 1208 } 1209 1210 /* 1211 * Finish populating the msg fields. The salt ensures that 1212 * the iv[] array is ridiculously randomized and we also 1213 * re-seed our PRNG every 32768 messages just to be sure. 1214 */ 1215 msg->any.head.magic = DMSG_HDR_MAGIC; 1216 msg->any.head.salt = (random() << 8) | (ioq->seq & 255); 1217 ++ioq->seq; 1218 if ((ioq->seq & 32767) == 0) { 1219 pthread_mutex_lock(&iocom->mtx); 1220 srandomdev(); 1221 pthread_mutex_unlock(&iocom->mtx); 1222 } 1223 1224 /* 1225 * Calculate aux_crc if 0, then calculate hdr_crc. 1226 */ 1227 if (msg->aux_size && msg->any.head.aux_crc == 0) { 1228 abytes = DMSG_DOALIGN(msg->aux_size); 1229 xcrc32 = dmsg_icrc32(msg->aux_data, abytes); 1230 msg->any.head.aux_crc = xcrc32; 1231 } 1232 msg->any.head.aux_bytes = msg->aux_size; 1233 1234 hbytes = (msg->any.head.cmd & DMSGF_SIZE) * 1235 DMSG_ALIGN; 1236 msg->any.head.hdr_crc = 0; 1237 msg->any.head.hdr_crc = dmsg_icrc32(&msg->any.head, hbytes); 1238 1239 /* 1240 * Enqueue the message (the flush codes handles stream 1241 * encryption). 1242 */ 1243 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry); 1244 ++ioq->msgcount; 1245 } 1246 dmsg_iocom_flush2(iocom); 1247 } 1248 1249 /* 1250 * Thread localized, iocom->mtx not held by caller. 1251 * 1252 * (called from iocom_core via iocom_flush1 only) 1253 */ 1254 void 1255 dmsg_iocom_flush2(dmsg_iocom_t *iocom) 1256 { 1257 dmsg_ioq_t *ioq = &iocom->ioq_tx; 1258 dmsg_msg_t *msg; 1259 ssize_t n; 1260 struct iovec iov[DMSG_IOQ_MAXIOVEC]; 1261 size_t nact; 1262 size_t hbytes; 1263 size_t abytes; 1264 size_t hoff; 1265 size_t aoff; 1266 int iovcnt; 1267 int save_errno; 1268 1269 if (ioq->error) { 1270 dmsg_iocom_drain(iocom); 1271 return; 1272 } 1273 1274 /* 1275 * Pump messages out the connection by building an iovec. 1276 * 1277 * ioq->hbytes/ioq->abytes tracks how much of the first message 1278 * in the queue has been successfully written out, so we can 1279 * resume writing. 1280 */ 1281 iovcnt = 0; 1282 nact = 0; 1283 hoff = ioq->hbytes; 1284 aoff = ioq->abytes; 1285 1286 TAILQ_FOREACH(msg, &ioq->msgq, qentry) { 1287 hbytes = (msg->any.head.cmd & DMSGF_SIZE) * 1288 DMSG_ALIGN; 1289 abytes = DMSG_DOALIGN(msg->aux_size); 1290 assert(hoff <= hbytes && aoff <= abytes); 1291 1292 if (hoff < hbytes) { 1293 size_t maxlen = hbytes - hoff; 1294 if (maxlen > sizeof(ioq->buf) / 2) 1295 maxlen = sizeof(ioq->buf) / 2; 1296 iov[iovcnt].iov_base = (char *)&msg->any.head + hoff; 1297 iov[iovcnt].iov_len = maxlen; 1298 nact += maxlen; 1299 ++iovcnt; 1300 if (iovcnt == DMSG_IOQ_MAXIOVEC || 1301 maxlen != hbytes - hoff) { 1302 break; 1303 } 1304 } 1305 if (aoff < abytes) { 1306 size_t maxlen = abytes - aoff; 1307 if (maxlen > sizeof(ioq->buf) / 2) 1308 maxlen = sizeof(ioq->buf) / 2; 1309 1310 assert(msg->aux_data != NULL); 1311 iov[iovcnt].iov_base = (char *)msg->aux_data + aoff; 1312 iov[iovcnt].iov_len = maxlen; 1313 nact += maxlen; 1314 ++iovcnt; 1315 if (iovcnt == DMSG_IOQ_MAXIOVEC || 1316 maxlen != abytes - aoff) { 1317 break; 1318 } 1319 } 1320 hoff = 0; 1321 aoff = 0; 1322 } 1323 1324 /* 1325 * Shortcut if no work to do. Be sure to check for old work still 1326 * pending in the FIFO. 1327 */ 1328 if (iovcnt == 0 && ioq->fifo_beg == ioq->fifo_cdx) 1329 return; 1330 1331 /* 1332 * Encrypt and write the data. The crypto code will move the 1333 * data into the fifo and adjust the iov as necessary. If 1334 * encryption is disabled the iov is left alone. 1335 * 1336 * May return a smaller iov (thus a smaller n), with aggregated 1337 * chunks. May reduce nmax to what fits in the FIFO. 1338 * 1339 * This function sets nact to the number of original bytes now 1340 * encrypted, adding to the FIFO some number of bytes that might 1341 * be greater depending on the crypto mechanic. iov[] is adjusted 1342 * to point at the FIFO if necessary. 1343 * 1344 * NOTE: nact is the number of bytes eaten from the message. For 1345 * encrypted data this is the number of bytes processed for 1346 * encryption and not necessarily the number of bytes writable. 1347 * The return value from the writev() is the post-encrypted 1348 * byte count which might be larger. 1349 * 1350 * NOTE: For direct writes, nact is the return value from the writev(). 1351 */ 1352 if (iocom->flags & DMSG_IOCOMF_CRYPTED) { 1353 /* 1354 * Make sure the FIFO has a reasonable amount of space 1355 * left (if not completely full). 1356 * 1357 * In this situation we are staging the encrypted message 1358 * data in the FIFO. (nact) represents how much plaintext 1359 * has been staged, (n) represents how much encrypted data 1360 * has been flushed. The two are independent of each other. 1361 */ 1362 if (ioq->fifo_beg > sizeof(ioq->buf) / 2 && 1363 sizeof(ioq->buf) - ioq->fifo_end < DMSG_ALIGN * 2) { 1364 bcopy(ioq->buf + ioq->fifo_beg, ioq->buf, 1365 ioq->fifo_end - ioq->fifo_beg); 1366 ioq->fifo_cdx -= ioq->fifo_beg; 1367 ioq->fifo_cdn -= ioq->fifo_beg; 1368 ioq->fifo_end -= ioq->fifo_beg; 1369 ioq->fifo_beg = 0; 1370 } 1371 1372 /* 1373 * beg .... cdx ............ cdn ............. end 1374 * [WRITABLE] [PARTIALENCRYPT] [NOTYETENCRYPTED] 1375 * 1376 * Advance fifo_beg on a successful write. 1377 */ 1378 iovcnt = dmsg_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact); 1379 n = writev(iocom->sock_fd, iov, iovcnt); 1380 save_errno = errno; 1381 if (n > 0) { 1382 ioq->fifo_beg += n; 1383 if (ioq->fifo_beg == ioq->fifo_end) { 1384 ioq->fifo_beg = 0; 1385 ioq->fifo_cdn = 0; 1386 ioq->fifo_cdx = 0; 1387 ioq->fifo_end = 0; 1388 } 1389 } 1390 1391 /* 1392 * We don't mess with the nact returned by the crypto_encrypt 1393 * call, which represents the filling of the FIFO. (n) tells 1394 * us how much we were able to write from the FIFO. The two 1395 * are different beasts when encrypting. 1396 */ 1397 } else { 1398 /* 1399 * In this situation we are not staging the messages to the 1400 * FIFO but instead writing them directly from the msg 1401 * structure(s) unencrypted, so (nact) is basically (n). 1402 */ 1403 n = writev(iocom->sock_fd, iov, iovcnt); 1404 save_errno = errno; 1405 if (n > 0) 1406 nact = n; 1407 else 1408 nact = 0; 1409 } 1410 1411 /* 1412 * Clean out the transmit queue based on what we successfully 1413 * encrypted (nact is the plaintext count) and is now in the FIFO. 1414 * ioq->hbytes/abytes represents the portion of the first message 1415 * previously sent. 1416 */ 1417 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) { 1418 hbytes = (msg->any.head.cmd & DMSGF_SIZE) * 1419 DMSG_ALIGN; 1420 abytes = DMSG_DOALIGN(msg->aux_size); 1421 1422 if ((size_t)nact < hbytes - ioq->hbytes) { 1423 ioq->hbytes += nact; 1424 nact = 0; 1425 break; 1426 } 1427 nact -= hbytes - ioq->hbytes; 1428 ioq->hbytes = hbytes; 1429 if ((size_t)nact < abytes - ioq->abytes) { 1430 ioq->abytes += nact; 1431 nact = 0; 1432 break; 1433 } 1434 nact -= abytes - ioq->abytes; 1435 /* ioq->abytes = abytes; optimized out */ 1436 1437 dmio_printf(iocom, 5, 1438 "txmsg cmd=%08x circ=%016jx\n", 1439 msg->any.head.cmd, 1440 (intmax_t)msg->any.head.circuit); 1441 1442 #ifdef DMSG_BLOCK_DEBUG 1443 uint32_t tcmd; 1444 1445 if (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE)) { 1446 if ((msg->state->flags & DMSG_STATE_ROOT) == 0) { 1447 tcmd = (msg->state->icmd & DMSGF_BASECMDMASK) | 1448 (msg->any.head.cmd & (DMSGF_CREATE | 1449 DMSGF_DELETE | 1450 DMSGF_REPLY)); 1451 } else { 1452 tcmd = 0; 1453 } 1454 } else { 1455 tcmd = msg->any.head.cmd & DMSGF_CMDSWMASK; 1456 } 1457 1458 switch (tcmd) { 1459 case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE: 1460 case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE: 1461 dmio_printf(iocom, 4, 1462 "write BIO %-3d %016jx %d@%016jx\n", 1463 biocount, msg->any.head.msgid, 1464 msg->any.blk_read.bytes, 1465 msg->any.blk_read.offset); 1466 break; 1467 case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY: 1468 case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY: 1469 dmio_printf(iocom, 4, 1470 "wretr BIO %-3d %016jx %d@%016jx\n", 1471 biocount, msg->any.head.msgid, 1472 msg->any.blk_read.bytes, 1473 msg->any.blk_read.offset); 1474 break; 1475 default: 1476 break; 1477 } 1478 #endif 1479 1480 TAILQ_REMOVE(&ioq->msgq, msg, qentry); 1481 --ioq->msgcount; 1482 ioq->hbytes = 0; 1483 ioq->abytes = 0; 1484 dmsg_msg_free(msg); 1485 } 1486 assert(nact == 0); 1487 1488 /* 1489 * Process the return value from the write w/regards to blocking. 1490 */ 1491 if (n < 0) { 1492 if (save_errno != EINTR && 1493 save_errno != EINPROGRESS && 1494 save_errno != EAGAIN) { 1495 /* 1496 * Fatal write error 1497 */ 1498 ioq->error = DMSG_IOQ_ERROR_SOCK; 1499 dmsg_iocom_drain(iocom); 1500 } else { 1501 /* 1502 * Wait for socket buffer space, do not try to 1503 * process more packets for transmit until space 1504 * is available. 1505 */ 1506 atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ); 1507 } 1508 } else if (TAILQ_FIRST(&ioq->msgq) || 1509 TAILQ_FIRST(&iocom->txmsgq) || 1510 ioq->fifo_beg != ioq->fifo_cdx) { 1511 /* 1512 * If the write succeeded and more messages are pending 1513 * in either msgq, or the FIFO WWORK must remain set. 1514 */ 1515 atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK); 1516 } 1517 /* else no transmit-side work remains */ 1518 1519 if (ioq->error) { 1520 dmsg_iocom_drain(iocom); 1521 } 1522 } 1523 1524 /* 1525 * Kill pending msgs on ioq_tx and adjust the flags such that no more 1526 * write events will occur. We don't kill read msgs because we want 1527 * the caller to pull off our contrived terminal error msg to detect 1528 * the connection failure. 1529 * 1530 * Localized to iocom_core thread, iocom->mtx not held by caller. 1531 */ 1532 void 1533 dmsg_iocom_drain(dmsg_iocom_t *iocom) 1534 { 1535 dmsg_ioq_t *ioq = &iocom->ioq_tx; 1536 dmsg_msg_t *msg; 1537 1538 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK); 1539 ioq->hbytes = 0; 1540 ioq->abytes = 0; 1541 1542 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) { 1543 TAILQ_REMOVE(&ioq->msgq, msg, qentry); 1544 --ioq->msgcount; 1545 dmsg_msg_free(msg); 1546 } 1547 } 1548 1549 /* 1550 * Write a message to an iocom, with additional state processing. 1551 */ 1552 void 1553 dmsg_msg_write(dmsg_msg_t *msg) 1554 { 1555 dmsg_iocom_t *iocom = msg->state->iocom; 1556 dmsg_state_t *state; 1557 char dummy; 1558 1559 pthread_mutex_lock(&iocom->mtx); 1560 state = msg->state; 1561 1562 dmio_printf(iocom, 5, 1563 "msgtx: cmd=%08x msgid=%016jx " 1564 "state %p(%08x) error=%d\n", 1565 msg->any.head.cmd, msg->any.head.msgid, 1566 state, (state ? state->icmd : 0), 1567 msg->any.head.error); 1568 1569 1570 #if 0 1571 /* 1572 * Make sure the parent transaction is still open in the transmit 1573 * direction. If it isn't the message is dead and we have to 1574 * potentially simulate a rxmsg terminating the transaction. 1575 */ 1576 if ((state->parent->txcmd & DMSGF_DELETE) || 1577 (state->parent->rxcmd & DMSGF_DELETE)) { 1578 dmio_printf(iocom, 4, "dmsg_msg_write: EARLY TERMINATION\n"); 1579 dmsg_simulate_failure(state, DMSG_ERR_LOSTLINK); 1580 dmsg_state_cleanuptx(iocom, msg); 1581 dmsg_msg_free(msg); 1582 pthread_mutex_unlock(&iocom->mtx); 1583 return; 1584 } 1585 #endif 1586 /* 1587 * Process state data into the message as needed, then update the 1588 * state based on the message. 1589 */ 1590 if ((state->flags & DMSG_STATE_ROOT) == 0) { 1591 /* 1592 * Existing transaction (could be reply). It is also 1593 * possible for this to be the first reply (CREATE is set), 1594 * in which case we populate state->txcmd. 1595 * 1596 * state->txcmd is adjusted to hold the final message cmd, 1597 * and we also be sure to set the CREATE bit here. We did 1598 * not set it in dmsg_msg_alloc() because that would have 1599 * not been serialized (state could have gotten ripped out 1600 * from under the message prior to it being transmitted). 1601 */ 1602 if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) == 1603 DMSGF_CREATE) { 1604 state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE; 1605 state->icmd = state->txcmd & DMSGF_BASECMDMASK; 1606 state->flags &= ~DMSG_STATE_NEW; 1607 } 1608 msg->any.head.msgid = state->msgid; 1609 1610 if (msg->any.head.cmd & DMSGF_CREATE) { 1611 state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE; 1612 } 1613 } 1614 1615 /* 1616 * Discard messages sent to transactions which are already dead. 1617 */ 1618 if (state && (state->txcmd & DMSGF_DELETE)) { 1619 dmio_printf(iocom, 4, 1620 "dmsg_msg_write: drop msg %08x to dead " 1621 "circuit state=%p\n", 1622 msg->any.head.cmd, state); 1623 dmsg_msg_free(msg); 1624 return; 1625 } 1626 1627 /* 1628 * Normally we queue the msg for output. However, if the circuit is 1629 * dead or dying we must simulate a failure in the return direction 1630 * and throw the message away. The other end is not expecting any 1631 * further messages from us on this state. 1632 * 1633 * Note that the I/O thread is responsible for generating the CRCs 1634 * and encryption. 1635 */ 1636 if (state->flags & DMSG_STATE_DYING) { 1637 #if 0 1638 if ((state->parent->txcmd & DMSGF_DELETE) || 1639 (state->parent->flags & DMSG_STATE_DYING) || 1640 (state->flags & DMSG_STATE_DYING)) { 1641 #endif 1642 /* 1643 * Illegal message, kill state and related sub-state. 1644 * Cannot transmit if state is already dying. 1645 */ 1646 dmio_printf(iocom, 4, 1647 "dmsg_msg_write: Write to dying circuit " 1648 "ptxcmd=%08x prxcmd=%08x flags=%08x\n", 1649 state->parent->rxcmd, 1650 state->parent->txcmd, 1651 state->parent->flags); 1652 dmsg_state_hold(state); 1653 dmsg_state_cleanuptx(iocom, msg); 1654 if ((state->flags & DMSG_STATE_ABORTING) == 0) { 1655 dmsg_simulate_failure(state, 1, DMSG_ERR_LOSTLINK); 1656 } 1657 dmsg_state_drop(state); 1658 dmsg_msg_free(msg); 1659 } else { 1660 /* 1661 * Queue the message, clean up transmit state prior to queueing 1662 * to avoid SMP races. 1663 */ 1664 dmio_printf(iocom, 5, 1665 "dmsg_msg_write: commit msg state=%p to txkmsgq\n", 1666 state); 1667 dmsg_state_cleanuptx(iocom, msg); 1668 TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry); 1669 dummy = 0; 1670 write(iocom->wakeupfds[1], &dummy, 1); /* XXX optimize me */ 1671 } 1672 pthread_mutex_unlock(&iocom->mtx); 1673 } 1674 1675 /* 1676 * Remove state from its parent's subq. This can wind up recursively 1677 * dropping the parent upward. 1678 * 1679 * NOTE: iocom must be locked. 1680 * 1681 * NOTE: Once we drop the parent, our pstate pointer may become invalid. 1682 */ 1683 static 1684 void 1685 dmsg_subq_delete(dmsg_state_t *state) 1686 { 1687 dmsg_state_t *pstate; 1688 1689 if (state->flags & DMSG_STATE_SUBINSERTED) { 1690 pstate = state->parent; 1691 assert(pstate); 1692 if (pstate->scan == state) 1693 pstate->scan = NULL; 1694 TAILQ_REMOVE(&pstate->subq, state, entry); 1695 state->flags &= ~DMSG_STATE_SUBINSERTED; 1696 state->parent = NULL; 1697 if (TAILQ_EMPTY(&pstate->subq)) 1698 dmsg_state_drop(pstate);/* pstate->subq */ 1699 pstate = NULL; /* safety */ 1700 dmsg_state_drop(state); /* pstate->subq */ 1701 } else { 1702 assert(state->parent == NULL); 1703 } 1704 } 1705 1706 /* 1707 * Simulate reception of a transaction DELETE message when the link goes 1708 * bad. This routine must recurse through state->subq and generate messages 1709 * and callbacks bottom-up. 1710 * 1711 * iocom->mtx must be held by caller. 1712 */ 1713 static 1714 void 1715 dmsg_simulate_failure(dmsg_state_t *state, int meto, int error) 1716 { 1717 dmsg_state_t *substate; 1718 1719 dmsg_state_hold(state); 1720 if (meto) 1721 dmsg_state_abort(state); 1722 1723 /* 1724 * Recurse through sub-states. 1725 */ 1726 again: 1727 TAILQ_FOREACH(substate, &state->subq, entry) { 1728 if (substate->flags & DMSG_STATE_ABORTING) 1729 continue; 1730 state->scan = substate; 1731 dmsg_simulate_failure(substate, 1, error); 1732 if (state->scan != substate) 1733 goto again; 1734 } 1735 1736 dmsg_state_drop(state); 1737 } 1738 1739 static 1740 void 1741 dmsg_state_abort(dmsg_state_t *state) 1742 { 1743 dmsg_iocom_t *iocom; 1744 dmsg_msg_t *msg; 1745 1746 /* 1747 * Set ABORTING and DYING, return if already set. If the state was 1748 * just allocated we defer the abort operation until the related 1749 * message is processed. 1750 */ 1751 if (state->flags & DMSG_STATE_ABORTING) 1752 return; 1753 state->flags |= DMSG_STATE_ABORTING; 1754 dmsg_state_dying(state); 1755 if (state->flags & DMSG_STATE_NEW) { 1756 dmio_printf(iocom, 4, 1757 "dmsg_state_abort(0): state %p rxcmd %08x " 1758 "txcmd %08x flags %08x - in NEW state\n", 1759 state, state->rxcmd, 1760 state->txcmd, state->flags); 1761 return; 1762 } 1763 1764 /* 1765 * Simulate parent state failure before child states. Device 1766 * drivers need to understand this and flag the situation but might 1767 * have asynchronous operations in progress that they cannot stop. 1768 * To make things easier, parent states will not actually disappear 1769 * until the children are all gone. 1770 */ 1771 if ((state->rxcmd & DMSGF_DELETE) == 0) { 1772 dmio_printf(iocom, 5, 1773 "dmsg_state_abort() on state %p\n", 1774 state); 1775 msg = dmsg_msg_alloc_locked(state, 0, DMSG_LNK_ERROR, 1776 NULL, NULL); 1777 if ((state->rxcmd & DMSGF_CREATE) == 0) 1778 msg->any.head.cmd |= DMSGF_CREATE; 1779 msg->any.head.cmd |= DMSGF_DELETE | 1780 (state->rxcmd & DMSGF_REPLY); 1781 msg->any.head.cmd ^= (DMSGF_REVTRANS | DMSGF_REVCIRC); 1782 msg->any.head.error = DMSG_ERR_LOSTLINK; 1783 msg->any.head.cmd |= DMSGF_ABORT; 1784 1785 /* 1786 * Issue callback synchronously even though this isn't 1787 * the receiver thread. We need to issue the callback 1788 * before removing state from the subq in order to allow 1789 * the callback to reply. 1790 */ 1791 iocom = state->iocom; 1792 dmsg_state_msgrx(msg, 1); 1793 pthread_mutex_unlock(&iocom->mtx); 1794 iocom->rcvmsg_callback(msg); 1795 pthread_mutex_lock(&iocom->mtx); 1796 dmsg_state_cleanuprx(iocom, msg); 1797 #if 0 1798 TAILQ_INSERT_TAIL(&iocom->ioq_rx.msgq, msg, qentry); 1799 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK); 1800 #endif 1801 } 1802 } 1803 1804 1805 /* 1806 * Recursively sets DMSG_STATE_DYING on state and all sub-states, preventing 1807 * the transmission of any new messages on these states. This is done 1808 * atomically when parent state is terminating, whereas setting ABORTING is 1809 * not atomic and can leak races. 1810 */ 1811 static 1812 void 1813 dmsg_state_dying(dmsg_state_t *state) 1814 { 1815 dmsg_state_t *scan; 1816 1817 if ((state->flags & DMSG_STATE_DYING) == 0) { 1818 state->flags |= DMSG_STATE_DYING; 1819 TAILQ_FOREACH(scan, &state->subq, entry) 1820 dmsg_state_dying(scan); 1821 } 1822 } 1823 1824 /* 1825 * This is a shortcut to formulate a reply to msg with a simple error code, 1826 * It can reply to and terminate a transaction, or it can reply to a one-way 1827 * messages. A DMSG_LNK_ERROR command code is utilized to encode 1828 * the error code (which can be 0). Not all transactions are terminated 1829 * with DMSG_LNK_ERROR status (the low level only cares about the 1830 * MSGF_DELETE flag), but most are. 1831 * 1832 * Replies to one-way messages are a bit of an oxymoron but the feature 1833 * is used by the debug (DBG) protocol. 1834 * 1835 * The reply contains no extended data. 1836 */ 1837 void 1838 dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error) 1839 { 1840 dmsg_state_t *state = msg->state; 1841 dmsg_msg_t *nmsg; 1842 uint32_t cmd; 1843 1844 /* 1845 * Reply with a simple error code and terminate the transaction. 1846 */ 1847 cmd = DMSG_LNK_ERROR; 1848 1849 /* 1850 * Check if our direction has even been initiated yet, set CREATE. 1851 * 1852 * Check what direction this is (command or reply direction). Note 1853 * that txcmd might not have been initiated yet. 1854 * 1855 * If our direction has already been closed we just return without 1856 * doing anything. 1857 */ 1858 if ((state->flags & DMSG_STATE_ROOT) == 0) { 1859 if (state->txcmd & DMSGF_DELETE) 1860 return; 1861 if (state->txcmd & DMSGF_REPLY) 1862 cmd |= DMSGF_REPLY; 1863 cmd |= DMSGF_DELETE; 1864 } else { 1865 if ((msg->any.head.cmd & DMSGF_REPLY) == 0) 1866 cmd |= DMSGF_REPLY; 1867 } 1868 1869 /* 1870 * Allocate the message and associate it with the existing state. 1871 * We cannot pass DMSGF_CREATE to msg_alloc() because that may 1872 * allocate new state. We have our state already. 1873 */ 1874 nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL); 1875 if ((state->flags & DMSG_STATE_ROOT) == 0) { 1876 if ((state->txcmd & DMSGF_CREATE) == 0) 1877 nmsg->any.head.cmd |= DMSGF_CREATE; 1878 } 1879 nmsg->any.head.error = error; 1880 1881 dmsg_msg_write(nmsg); 1882 } 1883 1884 /* 1885 * Similar to dmsg_msg_reply() but leave the transaction open. That is, 1886 * we are generating a streaming reply or an intermediate acknowledgement 1887 * of some sort as part of the higher level protocol, with more to come 1888 * later. 1889 */ 1890 void 1891 dmsg_msg_result(dmsg_msg_t *msg, uint32_t error) 1892 { 1893 dmsg_state_t *state = msg->state; 1894 dmsg_msg_t *nmsg; 1895 uint32_t cmd; 1896 1897 1898 /* 1899 * Reply with a simple error code and terminate the transaction. 1900 */ 1901 cmd = DMSG_LNK_ERROR; 1902 1903 /* 1904 * Check if our direction has even been initiated yet, set CREATE. 1905 * 1906 * Check what direction this is (command or reply direction). Note 1907 * that txcmd might not have been initiated yet. 1908 * 1909 * If our direction has already been closed we just return without 1910 * doing anything. 1911 */ 1912 if ((state->flags & DMSG_STATE_ROOT) == 0) { 1913 if (state->txcmd & DMSGF_DELETE) 1914 return; 1915 if (state->txcmd & DMSGF_REPLY) 1916 cmd |= DMSGF_REPLY; 1917 /* continuing transaction, do not set MSGF_DELETE */ 1918 } else { 1919 if ((msg->any.head.cmd & DMSGF_REPLY) == 0) 1920 cmd |= DMSGF_REPLY; 1921 } 1922 nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL); 1923 if ((state->flags & DMSG_STATE_ROOT) == 0) { 1924 if ((state->txcmd & DMSGF_CREATE) == 0) 1925 nmsg->any.head.cmd |= DMSGF_CREATE; 1926 } 1927 nmsg->any.head.error = error; 1928 1929 dmsg_msg_write(nmsg); 1930 } 1931 1932 /* 1933 * Terminate a transaction given a state structure by issuing a DELETE. 1934 * (the state structure must not be &iocom->state0) 1935 */ 1936 void 1937 dmsg_state_reply(dmsg_state_t *state, uint32_t error) 1938 { 1939 dmsg_msg_t *nmsg; 1940 uint32_t cmd = DMSG_LNK_ERROR | DMSGF_DELETE; 1941 1942 /* 1943 * Nothing to do if we already transmitted a delete 1944 */ 1945 if (state->txcmd & DMSGF_DELETE) 1946 return; 1947 1948 /* 1949 * Set REPLY if the other end initiated the command. Otherwise 1950 * we are the command direction. 1951 */ 1952 if (state->txcmd & DMSGF_REPLY) 1953 cmd |= DMSGF_REPLY; 1954 1955 nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL); 1956 if ((state->flags & DMSG_STATE_ROOT) == 0) { 1957 if ((state->txcmd & DMSGF_CREATE) == 0) 1958 nmsg->any.head.cmd |= DMSGF_CREATE; 1959 } 1960 nmsg->any.head.error = error; 1961 dmsg_msg_write(nmsg); 1962 } 1963 1964 /* 1965 * Terminate a transaction given a state structure by issuing a DELETE. 1966 * (the state structure must not be &iocom->state0) 1967 */ 1968 void 1969 dmsg_state_result(dmsg_state_t *state, uint32_t error) 1970 { 1971 dmsg_msg_t *nmsg; 1972 uint32_t cmd = DMSG_LNK_ERROR; 1973 1974 /* 1975 * Nothing to do if we already transmitted a delete 1976 */ 1977 if (state->txcmd & DMSGF_DELETE) 1978 return; 1979 1980 /* 1981 * Set REPLY if the other end initiated the command. Otherwise 1982 * we are the command direction. 1983 */ 1984 if (state->txcmd & DMSGF_REPLY) 1985 cmd |= DMSGF_REPLY; 1986 1987 nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL); 1988 if ((state->flags & DMSG_STATE_ROOT) == 0) { 1989 if ((state->txcmd & DMSGF_CREATE) == 0) 1990 nmsg->any.head.cmd |= DMSGF_CREATE; 1991 } 1992 nmsg->any.head.error = error; 1993 dmsg_msg_write(nmsg); 1994 } 1995 1996 /************************************************************************ 1997 * TRANSACTION STATE HANDLING * 1998 ************************************************************************ 1999 * 2000 */ 2001 2002 /* 2003 * Process state tracking for a message after reception, prior to execution. 2004 * Possibly route the message (consuming it). 2005 * 2006 * Called with msglk held and the msg dequeued. 2007 * 2008 * All messages are called with dummy state and return actual state. 2009 * (One-off messages often just return the same dummy state). 2010 * 2011 * May request that caller discard the message by setting *discardp to 1. 2012 * The returned state is not used in this case and is allowed to be NULL. 2013 * 2014 * -- 2015 * 2016 * These routines handle persistent and command/reply message state via the 2017 * CREATE and DELETE flags. The first message in a command or reply sequence 2018 * sets CREATE, the last message in a command or reply sequence sets DELETE. 2019 * 2020 * There can be any number of intermediate messages belonging to the same 2021 * sequence sent inbetween the CREATE message and the DELETE message, 2022 * which set neither flag. This represents a streaming command or reply. 2023 * 2024 * Any command message received with CREATE set expects a reply sequence to 2025 * be returned. Reply sequences work the same as command sequences except the 2026 * REPLY bit is also sent. Both the command side and reply side can 2027 * degenerate into a single message with both CREATE and DELETE set. Note 2028 * that one side can be streaming and the other side not, or neither, or both. 2029 * 2030 * The msgid is unique for the initiator. That is, two sides sending a new 2031 * message can use the same msgid without colliding. 2032 * 2033 * -- 2034 * 2035 * The message may be running over a circuit. If the circuit is half-deleted 2036 * The message is typically racing against a link failure and must be thrown 2037 * out. As the circuit deletion propagates the library will automatically 2038 * generate terminations for sub states. 2039 * 2040 * -- 2041 * 2042 * ABORT sequences work by setting the ABORT flag along with normal message 2043 * state. However, ABORTs can also be sent on half-closed messages, that is 2044 * even if the command or reply side has already sent a DELETE, as long as 2045 * the message has not been fully closed it can still send an ABORT+DELETE 2046 * to terminate the half-closed message state. 2047 * 2048 * Since ABORT+DELETEs can race we silently discard ABORT's for message 2049 * state which has already been fully closed. REPLY+ABORT+DELETEs can 2050 * also race, and in this situation the other side might have already 2051 * initiated a new unrelated command with the same message id. Since 2052 * the abort has not set the CREATE flag the situation can be detected 2053 * and the message will also be discarded. 2054 * 2055 * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE]. 2056 * The ABORT request is essentially integrated into the command instead 2057 * of being sent later on. In this situation the command implementation 2058 * detects that CREATE and ABORT are both set (vs ABORT alone) and can 2059 * special-case non-blocking operation for the command. 2060 * 2061 * NOTE! Messages with ABORT set without CREATE or DELETE are considered 2062 * to be mid-stream aborts for command/reply sequences. ABORTs on 2063 * one-way messages are not supported. 2064 * 2065 * NOTE! If a command sequence does not support aborts the ABORT flag is 2066 * simply ignored. 2067 * 2068 * -- 2069 * 2070 * One-off messages (no reply expected) are sent without an established 2071 * transaction. CREATE and DELETE are left clear and the msgid is usually 0. 2072 * For one-off messages sent over circuits msgid generally MUST be 0. 2073 * 2074 * One-off messages cannot be aborted and typically aren't processed 2075 * by these routines. Order is still guaranteed for messages sent over 2076 * the same circuit. The REPLY bit can be used to distinguish whether 2077 * a one-off message is a command or reply. For example, one-off replies 2078 * will typically just contain status updates. 2079 */ 2080 static int 2081 dmsg_state_msgrx(dmsg_msg_t *msg, int mstate) 2082 { 2083 dmsg_iocom_t *iocom = msg->state->iocom; 2084 dmsg_state_t *state; 2085 dmsg_state_t *pstate; 2086 dmsg_state_t sdummy; 2087 int error; 2088 2089 pthread_mutex_lock(&iocom->mtx); 2090 2091 if (DMsgDebugOpt) { 2092 dmio_printf(iocom, 5, 2093 "msgrx: cmd=%08x msgid=%016jx " 2094 "circuit=%016jx error=%d\n", 2095 msg->any.head.cmd, 2096 msg->any.head.msgid, 2097 msg->any.head.circuit, 2098 msg->any.head.error); 2099 } 2100 2101 /* 2102 * Lookup the circuit (pstate). The circuit will be an open 2103 * transaction. The REVCIRC bit in the message tells us which side 2104 * initiated it. 2105 * 2106 * If mstate is non-zero the state has already been incorporated 2107 * into the message as part of a simulated abort. Note that in this 2108 * situation the parent state may have already been removed from 2109 * the RBTREE. 2110 */ 2111 if (mstate) { 2112 pstate = msg->state->parent; 2113 } else if (msg->any.head.circuit) { 2114 sdummy.msgid = msg->any.head.circuit; 2115 2116 if (msg->any.head.cmd & DMSGF_REVCIRC) { 2117 pstate = RB_FIND(dmsg_state_tree, 2118 &iocom->statewr_tree, 2119 &sdummy); 2120 } else { 2121 pstate = RB_FIND(dmsg_state_tree, 2122 &iocom->staterd_tree, 2123 &sdummy); 2124 } 2125 2126 /* 2127 * If we cannot find the circuit throw the message away. 2128 * The state will have already been taken care of by 2129 * the simulated failure code. This case can occur due 2130 * to a failure propagating in one direction crossing a 2131 * request on the failed circuit propagating in the other 2132 * direction. 2133 */ 2134 if (pstate == NULL) { 2135 dmio_printf(iocom, 4, 2136 "missing parent in stacked trans %s\n", 2137 dmsg_msg_str(msg)); 2138 pthread_mutex_unlock(&iocom->mtx); 2139 error = DMSG_IOQ_ERROR_EALREADY; 2140 2141 return error; 2142 } 2143 } else { 2144 pstate = &iocom->state0; 2145 } 2146 /* WARNING: pstate not (yet) refd */ 2147 2148 /* 2149 * Lookup the msgid. 2150 * 2151 * If mstate is non-zero the state has already been incorporated 2152 * into the message as part of a simulated abort. Note that in this 2153 * situation the state may have already been removed from the RBTREE. 2154 * 2155 * If received msg is a command state is on staterd_tree. 2156 * If received msg is a reply state is on statewr_tree. 2157 * Otherwise there is no state (retain &iocom->state0) 2158 */ 2159 if (mstate) { 2160 state = msg->state; 2161 } else { 2162 sdummy.msgid = msg->any.head.msgid; 2163 if (msg->any.head.cmd & DMSGF_REVTRANS) { 2164 state = RB_FIND(dmsg_state_tree, 2165 &iocom->statewr_tree, &sdummy); 2166 } else { 2167 state = RB_FIND(dmsg_state_tree, 2168 &iocom->staterd_tree, &sdummy); 2169 } 2170 } 2171 2172 if (DMsgDebugOpt) { 2173 dmio_printf(iocom, 5, "msgrx:\tstate %p(%08x)", 2174 state, (state ? state->icmd : 0)); 2175 if (pstate != &iocom->state0) { 2176 dmio_printf(iocom, 5, 2177 " pstate %p(%08x)", 2178 pstate, pstate->icmd); 2179 } 2180 dmio_printf(iocom, 5, "%s\n", ""); 2181 } 2182 2183 if (mstate) { 2184 /* state already assigned to msg */ 2185 } else if (state) { 2186 /* 2187 * Message over an existing transaction (CREATE should not 2188 * be set). 2189 */ 2190 dmsg_state_drop(msg->state); 2191 dmsg_state_hold(state); 2192 msg->state = state; 2193 assert(pstate == state->parent); 2194 } else { 2195 /* 2196 * Either a new transaction (if CREATE set) or a one-off. 2197 */ 2198 state = pstate; 2199 } 2200 2201 /* 2202 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from 2203 * inside the case statements. 2204 * 2205 * Construct new state as necessary. 2206 */ 2207 switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE | 2208 DMSGF_REPLY)) { 2209 case DMSGF_CREATE: 2210 case DMSGF_CREATE | DMSGF_DELETE: 2211 /* 2212 * Create new sub-transaction under pstate. 2213 * (any DELETE is handled in post-processing of msg). 2214 * 2215 * (During routing the msgid was made unique for this 2216 * direction over the comlink, so our RB trees can be 2217 * iocom-based instead of state-based). 2218 */ 2219 if (state != pstate) { 2220 dmio_printf(iocom, 2, 2221 "duplicate transaction %s\n", 2222 dmsg_msg_str(msg)); 2223 error = DMSG_IOQ_ERROR_TRANS; 2224 assert(0); 2225 break; 2226 } 2227 2228 /* 2229 * Allocate the new state. 2230 */ 2231 state = malloc(sizeof(*state)); 2232 bzero(state, sizeof(*state)); 2233 atomic_add_int(&dmsg_state_count, 1); 2234 2235 TAILQ_INIT(&state->subq); 2236 dmsg_state_hold(pstate); 2237 state->parent = pstate; 2238 state->iocom = iocom; 2239 state->flags = DMSG_STATE_DYNAMIC | 2240 DMSG_STATE_OPPOSITE; 2241 state->msgid = msg->any.head.msgid; 2242 state->txcmd = DMSGF_REPLY; 2243 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE; 2244 state->icmd = state->rxcmd & DMSGF_BASECMDMASK; 2245 state->flags &= ~DMSG_STATE_NEW; 2246 msg->state = state; 2247 2248 RB_INSERT(dmsg_state_tree, &iocom->staterd_tree, state); 2249 if (TAILQ_EMPTY(&pstate->subq)) 2250 dmsg_state_hold(pstate);/* pstate->subq */ 2251 TAILQ_INSERT_TAIL(&pstate->subq, state, entry); 2252 state->flags |= DMSG_STATE_SUBINSERTED | 2253 DMSG_STATE_RBINSERTED; 2254 dmsg_state_hold(state); /* pstate->subq */ 2255 dmsg_state_hold(state); /* state on rbtree */ 2256 dmsg_state_hold(state); /* msg->state */ 2257 2258 /* 2259 * If the parent is a relay set up the state handler to 2260 * automatically route the message. Local processing will 2261 * not occur if set. 2262 * 2263 * (state relays are seeded by SPAN processing) 2264 */ 2265 if (pstate->relay) 2266 state->func = dmsg_state_relay; 2267 error = 0; 2268 break; 2269 case DMSGF_DELETE: 2270 /* 2271 * Persistent state is expected but might not exist if an 2272 * ABORT+DELETE races the close. 2273 * 2274 * (any DELETE is handled in post-processing of msg). 2275 */ 2276 if (state == pstate) { 2277 if (msg->any.head.cmd & DMSGF_ABORT) { 2278 error = DMSG_IOQ_ERROR_EALREADY; 2279 } else { 2280 dmio_printf(iocom, 2, 2281 "missing-state %s\n", 2282 dmsg_msg_str(msg)); 2283 error = DMSG_IOQ_ERROR_TRANS; 2284 assert(0); 2285 } 2286 break; 2287 } 2288 2289 /* 2290 * Handle another ABORT+DELETE case if the msgid has already 2291 * been reused. 2292 */ 2293 if ((state->rxcmd & DMSGF_CREATE) == 0) { 2294 if (msg->any.head.cmd & DMSGF_ABORT) { 2295 error = DMSG_IOQ_ERROR_EALREADY; 2296 } else { 2297 dmio_printf(iocom, 2, 2298 "reused-state %s\n", 2299 dmsg_msg_str(msg)); 2300 error = DMSG_IOQ_ERROR_TRANS; 2301 assert(0); 2302 } 2303 break; 2304 } 2305 error = 0; 2306 break; 2307 default: 2308 /* 2309 * Check for mid-stream ABORT command received, otherwise 2310 * allow. 2311 */ 2312 if (msg->any.head.cmd & DMSGF_ABORT) { 2313 if ((state == pstate) || 2314 (state->rxcmd & DMSGF_CREATE) == 0) { 2315 error = DMSG_IOQ_ERROR_EALREADY; 2316 break; 2317 } 2318 } 2319 error = 0; 2320 break; 2321 case DMSGF_REPLY | DMSGF_CREATE: 2322 case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE: 2323 /* 2324 * When receiving a reply with CREATE set the original 2325 * persistent state message should already exist. 2326 */ 2327 if (state == pstate) { 2328 dmio_printf(iocom, 2, "no-state(r) %s\n", 2329 dmsg_msg_str(msg)); 2330 error = DMSG_IOQ_ERROR_TRANS; 2331 assert(0); 2332 break; 2333 } 2334 assert(((state->rxcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0); 2335 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE; 2336 error = 0; 2337 break; 2338 case DMSGF_REPLY | DMSGF_DELETE: 2339 /* 2340 * Received REPLY+ABORT+DELETE in case where msgid has 2341 * already been fully closed, ignore the message. 2342 */ 2343 if (state == pstate) { 2344 if (msg->any.head.cmd & DMSGF_ABORT) { 2345 error = DMSG_IOQ_ERROR_EALREADY; 2346 } else { 2347 dmio_printf(iocom, 2, 2348 "no-state(r,d) %s\n", 2349 dmsg_msg_str(msg)); 2350 error = DMSG_IOQ_ERROR_TRANS; 2351 assert(0); 2352 } 2353 break; 2354 } 2355 2356 /* 2357 * Received REPLY+ABORT+DELETE in case where msgid has 2358 * already been reused for an unrelated message, 2359 * ignore the message. 2360 */ 2361 if ((state->rxcmd & DMSGF_CREATE) == 0) { 2362 if (msg->any.head.cmd & DMSGF_ABORT) { 2363 error = DMSG_IOQ_ERROR_EALREADY; 2364 } else { 2365 dmio_printf(iocom, 2, 2366 "reused-state(r,d) %s\n", 2367 dmsg_msg_str(msg)); 2368 error = DMSG_IOQ_ERROR_TRANS; 2369 assert(0); 2370 } 2371 break; 2372 } 2373 error = 0; 2374 break; 2375 case DMSGF_REPLY: 2376 /* 2377 * Check for mid-stream ABORT reply received to sent command. 2378 */ 2379 if (msg->any.head.cmd & DMSGF_ABORT) { 2380 if (state == pstate || 2381 (state->rxcmd & DMSGF_CREATE) == 0) { 2382 error = DMSG_IOQ_ERROR_EALREADY; 2383 break; 2384 } 2385 } 2386 error = 0; 2387 break; 2388 } 2389 2390 /* 2391 * Calculate the easy-switch() transactional command. Represents 2392 * the outer-transaction command for any transaction-create or 2393 * transaction-delete, and the inner message command for any 2394 * non-transaction or inside-transaction command. tcmd will be 2395 * set to 0 for any messaging error condition. 2396 * 2397 * The two can be told apart because outer-transaction commands 2398 * always have a DMSGF_CREATE and/or DMSGF_DELETE flag. 2399 */ 2400 if (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE)) { 2401 if ((msg->state->flags & DMSG_STATE_ROOT) == 0) { 2402 msg->tcmd = (state->icmd & DMSGF_BASECMDMASK) | 2403 (msg->any.head.cmd & (DMSGF_CREATE | 2404 DMSGF_DELETE | 2405 DMSGF_REPLY)); 2406 } else { 2407 msg->tcmd = 0; 2408 } 2409 } else { 2410 msg->tcmd = msg->any.head.cmd & DMSGF_CMDSWMASK; 2411 } 2412 2413 #ifdef DMSG_BLOCK_DEBUG 2414 switch (msg->tcmd) { 2415 case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE: 2416 case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE: 2417 dmio_printf(iocom, 4, 2418 "read BIO %-3d %016jx %d@%016jx\n", 2419 biocount, msg->any.head.msgid, 2420 msg->any.blk_read.bytes, 2421 msg->any.blk_read.offset); 2422 break; 2423 case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY: 2424 case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY: 2425 dmio_printf(iocom, 4, 2426 "rread BIO %-3d %016jx %d@%016jx\n", 2427 biocount, msg->any.head.msgid, 2428 msg->any.blk_read.bytes, 2429 msg->any.blk_read.offset); 2430 break; 2431 default: 2432 break; 2433 } 2434 #endif 2435 2436 /* 2437 * Adjust state, mark receive side as DELETED if appropriate and 2438 * adjust RB tree if both sides are DELETED. cleanuprx handles 2439 * the rest after the state callback returns. 2440 */ 2441 assert(msg->state->iocom == iocom); 2442 assert(msg->state == state); 2443 2444 if (state->flags & DMSG_STATE_ROOT) { 2445 /* 2446 * Nothing to do for non-transactional messages. 2447 */ 2448 } else if (msg->any.head.cmd & DMSGF_DELETE) { 2449 /* 2450 * Message terminating transaction, remove the state from 2451 * the RB tree if the full transaction is now complete. 2452 * The related state, subq, and parent link is retained 2453 * until after the state callback is complete. 2454 */ 2455 assert((state->rxcmd & DMSGF_DELETE) == 0); 2456 state->rxcmd |= DMSGF_DELETE; 2457 if (state->txcmd & DMSGF_DELETE) { 2458 assert(state->flags & DMSG_STATE_RBINSERTED); 2459 if (state->rxcmd & DMSGF_REPLY) { 2460 assert(msg->any.head.cmd & DMSGF_REPLY); 2461 RB_REMOVE(dmsg_state_tree, 2462 &iocom->statewr_tree, state); 2463 } else { 2464 assert((msg->any.head.cmd & DMSGF_REPLY) == 0); 2465 RB_REMOVE(dmsg_state_tree, 2466 &iocom->staterd_tree, state); 2467 } 2468 state->flags &= ~DMSG_STATE_RBINSERTED; 2469 dmsg_state_drop(state); 2470 } 2471 } 2472 2473 pthread_mutex_unlock(&iocom->mtx); 2474 2475 if (DMsgDebugOpt && error) 2476 dmio_printf(iocom, 1, "msgrx: error %d\n", error); 2477 2478 return (error); 2479 } 2480 2481 /* 2482 * Route the message and handle pair-state processing. 2483 */ 2484 void 2485 dmsg_state_relay(dmsg_msg_t *lmsg) 2486 { 2487 dmsg_state_t *lpstate; 2488 dmsg_state_t *rpstate; 2489 dmsg_state_t *lstate; 2490 dmsg_state_t *rstate; 2491 dmsg_msg_t *rmsg; 2492 2493 #ifdef DMSG_BLOCK_DEBUG 2494 switch (lmsg->tcmd) { 2495 case DMSG_BLK_OPEN | DMSGF_CREATE: 2496 dmio_printf(iocom, 4, "%s\n", 2497 "relay BIO_OPEN (CREATE)"); 2498 break; 2499 case DMSG_BLK_OPEN | DMSGF_DELETE: 2500 dmio_printf(iocom, 4, "%s\n", 2501 "relay BIO_OPEN (DELETE)"); 2502 break; 2503 case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE: 2504 case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE: 2505 atomic_add_int(&biocount, 1); 2506 dmio_printf(iocom, 4, 2507 "relay BIO %-3d %016jx %d@%016jx\n", 2508 biocount, lmsg->any.head.msgid, 2509 lmsg->any.blk_read.bytes, 2510 lmsg->any.blk_read.offset); 2511 break; 2512 case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY: 2513 case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY: 2514 dmio_printf(iocom, 4, 2515 "retrn BIO %-3d %016jx %d@%016jx\n", 2516 biocount, lmsg->any.head.msgid, 2517 lmsg->any.blk_read.bytes, 2518 lmsg->any.blk_read.offset); 2519 atomic_add_int(&biocount, -1); 2520 break; 2521 default: 2522 break; 2523 } 2524 #endif 2525 2526 if ((lmsg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) == 2527 DMSGF_CREATE) { 2528 /* 2529 * New sub-transaction, establish new state and relay. 2530 */ 2531 lstate = lmsg->state; 2532 lpstate = lstate->parent; 2533 rpstate = lpstate->relay; 2534 assert(lstate->relay == NULL); 2535 assert(rpstate != NULL); 2536 2537 rmsg = dmsg_msg_alloc(rpstate, 0, 2538 lmsg->any.head.cmd, 2539 dmsg_state_relay, NULL); 2540 rstate = rmsg->state; 2541 rstate->relay = lstate; 2542 lstate->relay = rstate; 2543 dmsg_state_hold(lstate); 2544 dmsg_state_hold(rstate); 2545 } else { 2546 /* 2547 * State & relay already established 2548 */ 2549 lstate = lmsg->state; 2550 rstate = lstate->relay; 2551 assert(rstate != NULL); 2552 2553 assert((rstate->txcmd & DMSGF_DELETE) == 0); 2554 2555 #if 0 2556 if (lstate->flags & DMSG_STATE_ABORTING) { 2557 dmio_printf(iocom, 4, 2558 "relay: relay lost link l=%p r=%p\n", 2559 lstate, rstate); 2560 dmsg_simulate_failure(rstate, 0, DMSG_ERR_LOSTLINK); 2561 } 2562 #endif 2563 2564 rmsg = dmsg_msg_alloc(rstate, 0, 2565 lmsg->any.head.cmd, 2566 dmsg_state_relay, NULL); 2567 } 2568 if (lmsg->hdr_size > sizeof(lmsg->any.head)) { 2569 bcopy(&lmsg->any.head + 1, &rmsg->any.head + 1, 2570 lmsg->hdr_size - sizeof(lmsg->any.head)); 2571 } 2572 rmsg->any.head.error = lmsg->any.head.error; 2573 rmsg->any.head.reserved02 = lmsg->any.head.reserved02; 2574 rmsg->any.head.link_verifier = lmsg->any.head.link_verifier; 2575 rmsg->aux_size = lmsg->aux_size; 2576 rmsg->aux_data = lmsg->aux_data; 2577 lmsg->aux_data = NULL; 2578 2579 dmsg_msg_write(rmsg); 2580 } 2581 2582 /* 2583 * Cleanup and retire msg after issuing the state callback. The state 2584 * has already been removed from the RB tree. The subq and msg must be 2585 * cleaned up. 2586 * 2587 * Called with the iocom mutex held (to handle subq disconnection). 2588 */ 2589 void 2590 dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg) 2591 { 2592 dmsg_state_t *state; 2593 2594 assert(msg->state->iocom == iocom); 2595 state = msg->state; 2596 if (state->flags & DMSG_STATE_ROOT) { 2597 /* 2598 * Free a non-transactional message, there is no state 2599 * to worry about. 2600 */ 2601 dmsg_msg_free(msg); 2602 } else if ((state->flags & DMSG_STATE_SUBINSERTED) && 2603 (state->rxcmd & DMSGF_DELETE) && 2604 (state->txcmd & DMSGF_DELETE)) { 2605 /* 2606 * Must disconnect from parent and drop relay. 2607 */ 2608 dmsg_subq_delete(state); 2609 if (state->relay) { 2610 dmsg_state_drop(state->relay); 2611 state->relay = NULL; 2612 } 2613 dmsg_msg_free(msg); 2614 } else { 2615 /* 2616 * Message not terminating transaction, leave state intact 2617 * and free message if it isn't the CREATE message. 2618 */ 2619 dmsg_msg_free(msg); 2620 } 2621 } 2622 2623 /* 2624 * Clean up the state after pulling out needed fields and queueing the 2625 * message for transmission. This occurs in dmsg_msg_write(). 2626 * 2627 * Called with the mutex locked. 2628 */ 2629 static void 2630 dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg) 2631 { 2632 dmsg_state_t *state; 2633 2634 assert(iocom == msg->state->iocom); 2635 state = msg->state; 2636 2637 dmsg_state_hold(state); 2638 2639 if (state->flags & DMSG_STATE_ROOT) { 2640 ; 2641 } else if (msg->any.head.cmd & DMSGF_DELETE) { 2642 /* 2643 * Message terminating transaction, destroy the related 2644 * state, the original message, and this message (if it 2645 * isn't the original message due to a CREATE|DELETE). 2646 * 2647 * It's possible for governing state to terminate while 2648 * sub-transactions still exist. This is allowed but 2649 * will cause sub-transactions to recursively fail. 2650 * Further reception of sub-transaction messages will be 2651 * impossible because the circuit will no longer exist. 2652 * (XXX need code to make sure that happens properly). 2653 * 2654 * NOTE: It is possible for a fafilure to terminate the 2655 * state after we have written the message but before 2656 * we are able to call cleanuptx, so txcmd might already 2657 * have DMSGF_DELETE set. 2658 */ 2659 if ((state->txcmd & DMSGF_DELETE) == 0 && 2660 (state->rxcmd & DMSGF_DELETE)) { 2661 state->txcmd |= DMSGF_DELETE; 2662 assert(state->flags & DMSG_STATE_RBINSERTED); 2663 if (state->txcmd & DMSGF_REPLY) { 2664 assert(msg->any.head.cmd & DMSGF_REPLY); 2665 RB_REMOVE(dmsg_state_tree, 2666 &iocom->staterd_tree, state); 2667 } else { 2668 assert((msg->any.head.cmd & DMSGF_REPLY) == 0); 2669 RB_REMOVE(dmsg_state_tree, 2670 &iocom->statewr_tree, state); 2671 } 2672 state->flags &= ~DMSG_STATE_RBINSERTED; 2673 dmsg_subq_delete(state); 2674 2675 if (state->relay) { 2676 dmsg_state_drop(state->relay); 2677 state->relay = NULL; 2678 } 2679 dmsg_state_drop(state); /* state->rbtree */ 2680 } else if ((state->txcmd & DMSGF_DELETE) == 0) { 2681 state->txcmd |= DMSGF_DELETE; 2682 } 2683 } 2684 2685 /* 2686 * Deferred abort after transmission. 2687 */ 2688 if ((state->flags & (DMSG_STATE_ABORTING | DMSG_STATE_DYING)) && 2689 (state->rxcmd & DMSGF_DELETE) == 0) { 2690 dmio_printf(iocom, 4, 2691 "cleanuptx: state=%p " 2692 "executing deferred abort\n", 2693 state); 2694 state->flags &= ~DMSG_STATE_ABORTING; 2695 dmsg_simulate_failure(state, 1, DMSG_ERR_LOSTLINK); 2696 } 2697 2698 dmsg_state_drop(state); 2699 } 2700 2701 /* 2702 * Called with or without locks 2703 */ 2704 void 2705 dmsg_state_hold(dmsg_state_t *state) 2706 { 2707 atomic_add_int(&state->refs, 1); 2708 } 2709 2710 void 2711 dmsg_state_drop(dmsg_state_t *state) 2712 { 2713 assert(state->refs > 0); 2714 if (atomic_fetchadd_int(&state->refs, -1) == 1) 2715 dmsg_state_free(state); 2716 } 2717 2718 /* 2719 * Called with iocom locked 2720 */ 2721 static void 2722 dmsg_state_free(dmsg_state_t *state) 2723 { 2724 atomic_add_int(&dmsg_state_count, -1); 2725 dmio_printf(state->iocom, 5, "terminate state %p\n", state); 2726 assert((state->flags & (DMSG_STATE_ROOT | 2727 DMSG_STATE_SUBINSERTED | 2728 DMSG_STATE_RBINSERTED)) == 0); 2729 assert(TAILQ_EMPTY(&state->subq)); 2730 assert(state->refs == 0); 2731 if (state->any.any != NULL) /* XXX avoid deadlock w/exit & kernel */ 2732 closefrom(3); 2733 assert(state->any.any == NULL); 2734 free(state); 2735 } 2736 2737 /* 2738 * This swaps endian for a hammer2_msg_hdr. Note that the extended 2739 * header is not adjusted, just the core header. 2740 */ 2741 void 2742 dmsg_bswap_head(dmsg_hdr_t *head) 2743 { 2744 head->magic = bswap16(head->magic); 2745 head->reserved02 = bswap16(head->reserved02); 2746 head->salt = bswap32(head->salt); 2747 2748 head->msgid = bswap64(head->msgid); 2749 head->circuit = bswap64(head->circuit); 2750 head->link_verifier= bswap64(head->link_verifier); 2751 2752 head->cmd = bswap32(head->cmd); 2753 head->aux_crc = bswap32(head->aux_crc); 2754 head->aux_bytes = bswap32(head->aux_bytes); 2755 head->error = bswap32(head->error); 2756 head->aux_descr = bswap64(head->aux_descr); 2757 head->reserved38= bswap32(head->reserved38); 2758 head->hdr_crc = bswap32(head->hdr_crc); 2759 } 2760