xref: /dragonfly/lib/libdmsg/msg.c (revision 092c2dd1)
1 /*
2  * Copyright (c) 2011-2015 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35 
36 #include "dmsg_local.h"
37 
38 #define DMSG_BLOCK_DEBUG
39 
40 int DMsgDebugOpt;
41 static unsigned int dmsg_state_count;
42 #ifdef DMSG_BLOCK_DEBUG
43 static unsigned int biocount;
44 #endif
45 
46 static int dmsg_state_msgrx(dmsg_msg_t *msg, int mstate);
47 static void dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg);
48 static void dmsg_msg_free_locked(dmsg_msg_t *msg);
49 static void dmsg_state_free(dmsg_state_t *state);
50 static void dmsg_subq_delete(dmsg_state_t *state);
51 static void dmsg_simulate_failure(dmsg_state_t *state, int meto, int error);
52 static void dmsg_state_abort(dmsg_state_t *state);
53 static void dmsg_state_dying(dmsg_state_t *state);
54 
55 RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
56 
57 /*
58  * STATE TREE - Represents open transactions which are indexed by their
59  *		{ msgid } relative to the governing iocom.
60  */
61 int
62 dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2)
63 {
64 	if (state1->msgid < state2->msgid)
65 		return(-1);
66 	if (state1->msgid > state2->msgid)
67 		return(1);
68 	return(0);
69 }
70 
71 /*
72  * Initialize a low-level ioq
73  */
74 void
75 dmsg_ioq_init(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
76 {
77 	bzero(ioq, sizeof(*ioq));
78 	ioq->state = DMSG_MSGQ_STATE_HEADER1;
79 	TAILQ_INIT(&ioq->msgq);
80 }
81 
82 /*
83  * Cleanup queue.
84  *
85  * caller holds iocom->mtx.
86  */
87 void
88 dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
89 {
90 	dmsg_msg_t *msg;
91 
92 	while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
93 		assert(0);	/* shouldn't happen */
94 		TAILQ_REMOVE(&ioq->msgq, msg, qentry);
95 		dmsg_msg_free(msg);
96 	}
97 	if ((msg = ioq->msg) != NULL) {
98 		ioq->msg = NULL;
99 		dmsg_msg_free(msg);
100 	}
101 }
102 
103 /*
104  * Initialize a low-level communications channel.
105  *
106  * NOTE: The signal_func() is called at least once from the loop and can be
107  *	 re-armed via dmsg_iocom_restate().
108  */
109 void
110 dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
111 		   void (*signal_func)(dmsg_iocom_t *iocom),
112 		   void (*rcvmsg_func)(dmsg_msg_t *msg),
113 		   void (*usrmsg_func)(dmsg_msg_t *msg, int unmanaged),
114 		   void (*altmsg_func)(dmsg_iocom_t *iocom))
115 {
116 	struct stat st;
117 
118 	bzero(iocom, sizeof(*iocom));
119 
120 	asprintf(&iocom->label, "iocom-%p", iocom);
121 	iocom->signal_callback = signal_func;
122 	iocom->rcvmsg_callback = rcvmsg_func;
123 	iocom->altmsg_callback = altmsg_func;
124 	iocom->usrmsg_callback = usrmsg_func;
125 
126 	pthread_mutex_init(&iocom->mtx, NULL);
127 	RB_INIT(&iocom->staterd_tree);
128 	RB_INIT(&iocom->statewr_tree);
129 	TAILQ_INIT(&iocom->txmsgq);
130 	iocom->sock_fd = sock_fd;
131 	iocom->alt_fd = alt_fd;
132 	iocom->flags = DMSG_IOCOMF_RREQ | DMSG_IOCOMF_CLOSEALT;
133 	if (signal_func)
134 		iocom->flags |= DMSG_IOCOMF_SWORK;
135 	dmsg_ioq_init(iocom, &iocom->ioq_rx);
136 	dmsg_ioq_init(iocom, &iocom->ioq_tx);
137 	iocom->state0.refs = 1;		/* should never trigger a free */
138 	iocom->state0.iocom = iocom;
139 	iocom->state0.parent = &iocom->state0;
140 	iocom->state0.flags = DMSG_STATE_ROOT;
141 	TAILQ_INIT(&iocom->state0.subq);
142 
143 	if (pipe(iocom->wakeupfds) < 0)
144 		assert(0);
145 	fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
146 	fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
147 
148 	/*
149 	 * Negotiate session crypto synchronously.  This will mark the
150 	 * connection as error'd if it fails.  If this is a pipe it's
151 	 * a linkage that we set up ourselves to the filesystem and there
152 	 * is no crypto.
153 	 */
154 	if (fstat(sock_fd, &st) < 0)
155 		assert(0);
156 	if (S_ISSOCK(st.st_mode))
157 		dmsg_crypto_negotiate(iocom);
158 
159 	/*
160 	 * Make sure our fds are set to non-blocking for the iocom core.
161 	 */
162 	if (sock_fd >= 0)
163 		fcntl(sock_fd, F_SETFL, O_NONBLOCK);
164 #if 0
165 	/* if line buffered our single fgets() should be fine */
166 	if (alt_fd >= 0)
167 		fcntl(alt_fd, F_SETFL, O_NONBLOCK);
168 #endif
169 }
170 
171 void
172 dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...)
173 {
174 	va_list va;
175 	char *optr;
176 
177 	va_start(va, ctl);
178 	optr = iocom->label;
179 	vasprintf(&iocom->label, ctl, va);
180 	va_end(va);
181 	if (optr)
182 		free(optr);
183 }
184 
185 /*
186  * May only be called from a callback from iocom_core.
187  *
188  * Adjust state machine functions, set flags to guarantee that both
189  * the recevmsg_func and the sendmsg_func is called at least once.
190  */
191 void
192 dmsg_iocom_restate(dmsg_iocom_t *iocom,
193 		   void (*signal_func)(dmsg_iocom_t *),
194 		   void (*rcvmsg_func)(dmsg_msg_t *msg))
195 {
196 	pthread_mutex_lock(&iocom->mtx);
197 	iocom->signal_callback = signal_func;
198 	iocom->rcvmsg_callback = rcvmsg_func;
199 	if (signal_func)
200 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
201 	else
202 		atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
203 	pthread_mutex_unlock(&iocom->mtx);
204 }
205 
206 void
207 dmsg_iocom_signal(dmsg_iocom_t *iocom)
208 {
209 	pthread_mutex_lock(&iocom->mtx);
210 	if (iocom->signal_callback)
211 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
212 	pthread_mutex_unlock(&iocom->mtx);
213 }
214 
215 /*
216  * Cleanup a terminating iocom.
217  *
218  * Caller should not hold iocom->mtx.  The iocom has already been disconnected
219  * from all possible references to it.
220  */
221 void
222 dmsg_iocom_done(dmsg_iocom_t *iocom)
223 {
224 	if (iocom->sock_fd >= 0) {
225 		close(iocom->sock_fd);
226 		iocom->sock_fd = -1;
227 	}
228 	if (iocom->alt_fd >= 0 && (iocom->flags & DMSG_IOCOMF_CLOSEALT)) {
229 		close(iocom->alt_fd);
230 		iocom->alt_fd = -1;
231 	}
232 	dmsg_ioq_done(iocom, &iocom->ioq_rx);
233 	dmsg_ioq_done(iocom, &iocom->ioq_tx);
234 	if (iocom->wakeupfds[0] >= 0) {
235 		close(iocom->wakeupfds[0]);
236 		iocom->wakeupfds[0] = -1;
237 	}
238 	if (iocom->wakeupfds[1] >= 0) {
239 		close(iocom->wakeupfds[1]);
240 		iocom->wakeupfds[1] = -1;
241 	}
242 	pthread_mutex_destroy(&iocom->mtx);
243 }
244 
245 /*
246  * Allocate a new message using the specified transaction state.
247  *
248  * If CREATE is set a new transaction is allocated relative to the passed-in
249  * transaction (the 'state' argument becomes pstate).
250  *
251  * If CREATE is not set the message is associated with the passed-in
252  * transaction.
253  */
254 dmsg_msg_t *
255 dmsg_msg_alloc(dmsg_state_t *state,
256 	       size_t aux_size, uint32_t cmd,
257 	       void (*func)(dmsg_msg_t *), void *data)
258 {
259 	dmsg_iocom_t *iocom = state->iocom;
260 	dmsg_msg_t *msg;
261 
262 	pthread_mutex_lock(&iocom->mtx);
263 	msg = dmsg_msg_alloc_locked(state, aux_size, cmd, func, data);
264 	pthread_mutex_unlock(&iocom->mtx);
265 
266 	return msg;
267 }
268 
269 dmsg_msg_t *
270 dmsg_msg_alloc_locked(dmsg_state_t *state,
271 	       size_t aux_size, uint32_t cmd,
272 	       void (*func)(dmsg_msg_t *), void *data)
273 {
274 	dmsg_iocom_t *iocom = state->iocom;
275 	dmsg_state_t *pstate;
276 	dmsg_msg_t *msg;
277 	int hbytes;
278 	size_t aligned_size;
279 
280 	aligned_size = DMSG_DOALIGN(aux_size);
281 	if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
282 		/*
283 		 * When CREATE is set without REPLY the caller is
284 		 * initiating a new transaction stacked under the specified
285 		 * circuit.
286 		 *
287 		 * It is possible to race a circuit failure, inherit the
288 		 * parent's STATE_DYING flag to trigger an abort sequence
289 		 * in the transmit path.  By not inheriting ABORTING the
290 		 * abort sequence can recurse.
291 		 *
292 		 * NOTE: CREATE in txcmd handled by dmsg_msg_write()
293 		 * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx()
294 		 */
295 		pstate = state;
296 		state = malloc(sizeof(*state));
297 		bzero(state, sizeof(*state));
298 		atomic_add_int(&dmsg_state_count, 1);
299 
300 		TAILQ_INIT(&state->subq);
301 		state->parent = pstate;
302 		state->iocom = iocom;
303 		state->flags = DMSG_STATE_DYNAMIC;
304 		state->msgid = (uint64_t)(uintptr_t)state;
305 		state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE);
306 		state->rxcmd = DMSGF_REPLY;
307 		state->icmd = state->txcmd & DMSGF_BASECMDMASK;
308 		state->func = func;
309 		state->any.any = data;
310 
311 		state->flags |= DMSG_STATE_SUBINSERTED |
312 				DMSG_STATE_RBINSERTED;
313 		state->flags |= pstate->flags & DMSG_STATE_DYING;
314 		if (TAILQ_EMPTY(&pstate->subq))
315 			dmsg_state_hold(pstate);
316 		RB_INSERT(dmsg_state_tree, &iocom->statewr_tree, state);
317 		TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
318 		dmsg_state_hold(state);		/* state on pstate->subq */
319 		dmsg_state_hold(state);		/* state on rbtree */
320 		dmsg_state_hold(state);		/* msg->state */
321 	} else {
322 		/*
323 		 * Otherwise the message is transmitted over the existing
324 		 * open transaction.
325 		 */
326 		pstate = state->parent;
327 		dmsg_state_hold(state);		/* msg->state */
328 	}
329 
330 	/* XXX SMP race for state */
331 	hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
332 	assert((size_t)hbytes >= sizeof(struct dmsg_hdr));
333 	msg = malloc(offsetof(struct dmsg_msg, any.head) + hbytes);
334 	bzero(msg, offsetof(struct dmsg_msg, any.head));
335 
336 	/*
337 	 * [re]allocate the auxillary data buffer.  The caller knows that
338 	 * a size-aligned buffer will be allocated but we do not want to
339 	 * force the caller to zero any tail piece, so we do that ourself.
340 	 */
341 	if (msg->aux_size != aux_size) {
342 		if (msg->aux_data) {
343 			free(msg->aux_data);
344 			msg->aux_data = NULL;
345 			msg->aux_size = 0;
346 		}
347 		if (aux_size) {
348 			msg->aux_data = malloc(aligned_size);
349 			msg->aux_size = aux_size;
350 			if (aux_size != aligned_size) {
351 				bzero(msg->aux_data + aux_size,
352 				      aligned_size - aux_size);
353 			}
354 		}
355 	}
356 
357 	/*
358 	 * Set REVTRANS if the transaction was remotely initiated
359 	 * Set REVCIRC if the circuit was remotely initiated
360 	 */
361 	if (state->flags & DMSG_STATE_OPPOSITE)
362 		cmd |= DMSGF_REVTRANS;
363 	if (pstate->flags & DMSG_STATE_OPPOSITE)
364 		cmd |= DMSGF_REVCIRC;
365 
366 	/*
367 	 * Finish filling out the header.
368 	 */
369 	bzero(&msg->any.head, hbytes);
370 	msg->hdr_size = hbytes;
371 	msg->any.head.magic = DMSG_HDR_MAGIC;
372 	msg->any.head.cmd = cmd;
373 	msg->any.head.aux_descr = 0;
374 	msg->any.head.aux_crc = 0;
375 	msg->any.head.msgid = state->msgid;
376 	msg->any.head.circuit = pstate->msgid;
377 	msg->state = state;
378 
379 	return (msg);
380 }
381 
382 /*
383  * Free a message so it can be reused afresh.
384  *
385  * NOTE: aux_size can be 0 with a non-NULL aux_data.
386  */
387 static
388 void
389 dmsg_msg_free_locked(dmsg_msg_t *msg)
390 {
391 	dmsg_state_t *state;
392 
393 	if ((state = msg->state) != NULL) {
394 		dmsg_state_drop(state);
395 		msg->state = NULL;	/* safety */
396 	}
397 	if (msg->aux_data) {
398 		free(msg->aux_data);
399 		msg->aux_data = NULL;	/* safety */
400 	}
401 	msg->aux_size = 0;
402 	free (msg);
403 }
404 
405 void
406 dmsg_msg_free(dmsg_msg_t *msg)
407 {
408 	dmsg_iocom_t *iocom = msg->state->iocom;
409 
410 	pthread_mutex_lock(&iocom->mtx);
411 	dmsg_msg_free_locked(msg);
412 	pthread_mutex_unlock(&iocom->mtx);
413 }
414 
415 /*
416  * I/O core loop for an iocom.
417  *
418  * Thread localized, iocom->mtx not held.
419  */
420 void
421 dmsg_iocom_core(dmsg_iocom_t *iocom)
422 {
423 	struct pollfd fds[3];
424 	char dummybuf[256];
425 	dmsg_msg_t *msg;
426 	int timeout;
427 	int count;
428 	int wi;	/* wakeup pipe */
429 	int si;	/* socket */
430 	int ai;	/* alt bulk path socket */
431 
432 	while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) {
433 		/*
434 		 * These iocom->flags are only manipulated within the
435 		 * context of the current thread.  However, modifications
436 		 * still require atomic ops.
437 		 */
438 		dmio_printf(iocom, 5, "iocom %p %08x\n",
439 			    iocom, iocom->flags);
440 		if ((iocom->flags & (DMSG_IOCOMF_RWORK |
441 				     DMSG_IOCOMF_WWORK |
442 				     DMSG_IOCOMF_PWORK |
443 				     DMSG_IOCOMF_SWORK |
444 				     DMSG_IOCOMF_ARWORK |
445 				     DMSG_IOCOMF_AWWORK)) == 0) {
446 			/*
447 			 * Only poll if no immediate work is pending.
448 			 * Otherwise we are just wasting our time calling
449 			 * poll.
450 			 */
451 			timeout = 5000;
452 
453 			count = 0;
454 			wi = -1;
455 			si = -1;
456 			ai = -1;
457 
458 			/*
459 			 * Always check the inter-thread pipe, e.g.
460 			 * for iocom->txmsgq work.
461 			 */
462 			wi = count++;
463 			fds[wi].fd = iocom->wakeupfds[0];
464 			fds[wi].events = POLLIN;
465 			fds[wi].revents = 0;
466 
467 			/*
468 			 * Check the socket input/output direction as
469 			 * requested
470 			 */
471 			if (iocom->flags & (DMSG_IOCOMF_RREQ |
472 					    DMSG_IOCOMF_WREQ)) {
473 				si = count++;
474 				fds[si].fd = iocom->sock_fd;
475 				fds[si].events = 0;
476 				fds[si].revents = 0;
477 
478 				if (iocom->flags & DMSG_IOCOMF_RREQ)
479 					fds[si].events |= POLLIN;
480 				if (iocom->flags & DMSG_IOCOMF_WREQ)
481 					fds[si].events |= POLLOUT;
482 			}
483 
484 			/*
485 			 * Check the alternative fd for work.
486 			 */
487 			if (iocom->alt_fd >= 0) {
488 				ai = count++;
489 				fds[ai].fd = iocom->alt_fd;
490 				fds[ai].events = POLLIN;
491 				fds[ai].revents = 0;
492 			}
493 			poll(fds, count, timeout);
494 
495 			if (wi >= 0 && (fds[wi].revents & POLLIN))
496 				atomic_set_int(&iocom->flags,
497 					       DMSG_IOCOMF_PWORK);
498 			if (si >= 0 && (fds[si].revents & POLLIN))
499 				atomic_set_int(&iocom->flags,
500 					       DMSG_IOCOMF_RWORK);
501 			if (si >= 0 && (fds[si].revents & POLLOUT))
502 				atomic_set_int(&iocom->flags,
503 					       DMSG_IOCOMF_WWORK);
504 			if (wi >= 0 && (fds[wi].revents & POLLOUT))
505 				atomic_set_int(&iocom->flags,
506 					       DMSG_IOCOMF_WWORK);
507 			if (ai >= 0 && (fds[ai].revents & POLLIN))
508 				atomic_set_int(&iocom->flags,
509 					       DMSG_IOCOMF_ARWORK);
510 		} else {
511 			/*
512 			 * Always check the pipe
513 			 */
514 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_PWORK);
515 		}
516 
517 		if (iocom->flags & DMSG_IOCOMF_SWORK) {
518 			atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
519 			iocom->signal_callback(iocom);
520 		}
521 
522 		/*
523 		 * Pending message queues from other threads wake us up
524 		 * with a write to the wakeupfds[] pipe.  We have to clear
525 		 * the pipe with a dummy read.
526 		 */
527 		if (iocom->flags & DMSG_IOCOMF_PWORK) {
528 			atomic_clear_int(&iocom->flags, DMSG_IOCOMF_PWORK);
529 			read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
530 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
531 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK);
532 		}
533 
534 		/*
535 		 * Message write sequencing
536 		 */
537 		if (iocom->flags & DMSG_IOCOMF_WWORK)
538 			dmsg_iocom_flush1(iocom);
539 
540 		/*
541 		 * Message read sequencing.  Run this after the write
542 		 * sequencing in case the write sequencing allowed another
543 		 * auto-DELETE to occur on the read side.
544 		 */
545 		if (iocom->flags & DMSG_IOCOMF_RWORK) {
546 			while ((iocom->flags & DMSG_IOCOMF_EOF) == 0 &&
547 			       (msg = dmsg_ioq_read(iocom)) != NULL) {
548 				dmio_printf(iocom, 4, "receive %s\n",
549 					    dmsg_msg_str(msg));
550 				iocom->rcvmsg_callback(msg);
551 				pthread_mutex_lock(&iocom->mtx);
552 				dmsg_state_cleanuprx(iocom, msg);
553 				pthread_mutex_unlock(&iocom->mtx);
554 			}
555 		}
556 
557 		if (iocom->flags & DMSG_IOCOMF_ARWORK) {
558 			atomic_clear_int(&iocom->flags, DMSG_IOCOMF_ARWORK);
559 			iocom->altmsg_callback(iocom);
560 		}
561 	}
562 }
563 
564 /*
565  * Make sure there's enough room in the FIFO to hold the
566  * needed data.
567  *
568  * Assume worst case encrypted form is 2x the size of the
569  * plaintext equivalent.
570  */
571 static
572 size_t
573 dmsg_ioq_makeroom(dmsg_ioq_t *ioq, size_t needed)
574 {
575 	size_t bytes;
576 	size_t nmax;
577 
578 	bytes = ioq->fifo_cdx - ioq->fifo_beg;
579 	nmax = sizeof(ioq->buf) - ioq->fifo_end;
580 	if (bytes + nmax / 2 < needed) {
581 		if (bytes) {
582 			bcopy(ioq->buf + ioq->fifo_beg,
583 			      ioq->buf,
584 			      bytes);
585 		}
586 		ioq->fifo_cdx -= ioq->fifo_beg;
587 		ioq->fifo_beg = 0;
588 		if (ioq->fifo_cdn < ioq->fifo_end) {
589 			bcopy(ioq->buf + ioq->fifo_cdn,
590 			      ioq->buf + ioq->fifo_cdx,
591 			      ioq->fifo_end - ioq->fifo_cdn);
592 		}
593 		ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx;
594 		ioq->fifo_cdn = ioq->fifo_cdx;
595 		nmax = sizeof(ioq->buf) - ioq->fifo_end;
596 	}
597 	return(nmax);
598 }
599 
600 /*
601  * Read the next ready message from the ioq, issuing I/O if needed.
602  * Caller should retry on a read-event when NULL is returned.
603  *
604  * If an error occurs during reception a DMSG_LNK_ERROR msg will
605  * be returned for each open transaction, then the ioq and iocom
606  * will be errored out and a non-transactional DMSG_LNK_ERROR
607  * msg will be returned as the final message.  The caller should not call
608  * us again after the final message is returned.
609  *
610  * Thread localized, iocom->mtx not held.
611  */
612 dmsg_msg_t *
613 dmsg_ioq_read(dmsg_iocom_t *iocom)
614 {
615 	dmsg_ioq_t *ioq = &iocom->ioq_rx;
616 	dmsg_msg_t *msg;
617 	dmsg_hdr_t *head;
618 	ssize_t n;
619 	size_t bytes;
620 	size_t nmax;
621 	uint32_t aux_size;
622 	uint32_t xcrc32;
623 	int error;
624 
625 again:
626 	/*
627 	 * If a message is already pending we can just remove and
628 	 * return it.  Message state has already been processed.
629 	 * (currently not implemented)
630 	 */
631 	if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
632 		TAILQ_REMOVE(&ioq->msgq, msg, qentry);
633 
634 		if (msg->state == &iocom->state0) {
635 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
636 			dmio_printf(iocom, 1,
637 				    "EOF ON SOCKET %d\n",
638 				    iocom->sock_fd);
639 		}
640 		return (msg);
641 	}
642 	atomic_clear_int(&iocom->flags, DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK);
643 
644 	/*
645 	 * If the stream is errored out we stop processing it.
646 	 */
647 	if (ioq->error)
648 		goto skip;
649 
650 	/*
651 	 * Message read in-progress (msg is NULL at the moment).  We don't
652 	 * allocate a msg until we have its core header.
653 	 */
654 	nmax = sizeof(ioq->buf) - ioq->fifo_end;
655 	bytes = ioq->fifo_cdx - ioq->fifo_beg;		/* already decrypted */
656 	msg = ioq->msg;
657 
658 	switch(ioq->state) {
659 	case DMSG_MSGQ_STATE_HEADER1:
660 		/*
661 		 * Load the primary header, fail on any non-trivial read
662 		 * error or on EOF.  Since the primary header is the same
663 		 * size is the message alignment it will never straddle
664 		 * the end of the buffer.
665 		 */
666 		nmax = dmsg_ioq_makeroom(ioq, sizeof(msg->any.head));
667 		if (bytes < sizeof(msg->any.head)) {
668 			n = read(iocom->sock_fd,
669 				 ioq->buf + ioq->fifo_end,
670 				 nmax);
671 			if (n <= 0) {
672 				if (n == 0) {
673 					ioq->error = DMSG_IOQ_ERROR_EOF;
674 					break;
675 				}
676 				if (errno != EINTR &&
677 				    errno != EINPROGRESS &&
678 				    errno != EAGAIN) {
679 					ioq->error = DMSG_IOQ_ERROR_SOCK;
680 					break;
681 				}
682 				n = 0;
683 				/* fall through */
684 			}
685 			ioq->fifo_end += (size_t)n;
686 			nmax -= (size_t)n;
687 		}
688 
689 		/*
690 		 * Decrypt data received so far.  Data will be decrypted
691 		 * in-place but might create gaps in the FIFO.  Partial
692 		 * blocks are not immediately decrypted.
693 		 *
694 		 * WARNING!  The header might be in the wrong endian, we
695 		 *	     do not fix it up until we get the entire
696 		 *	     extended header.
697 		 */
698 		if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
699 			dmsg_crypto_decrypt(iocom, ioq);
700 		} else {
701 			ioq->fifo_cdx = ioq->fifo_end;
702 			ioq->fifo_cdn = ioq->fifo_end;
703 		}
704 		bytes = ioq->fifo_cdx - ioq->fifo_beg;
705 
706 		/*
707 		 * Insufficient data accumulated (msg is NULL, caller will
708 		 * retry on event).
709 		 */
710 		assert(msg == NULL);
711 		if (bytes < sizeof(msg->any.head))
712 			break;
713 
714 		/*
715 		 * Check and fixup the core header.  Note that the icrc
716 		 * has to be calculated before any fixups, but the crc
717 		 * fields in the msg may have to be swapped like everything
718 		 * else.
719 		 */
720 		head = (void *)(ioq->buf + ioq->fifo_beg);
721 		if (head->magic != DMSG_HDR_MAGIC &&
722 		    head->magic != DMSG_HDR_MAGIC_REV) {
723 			dmio_printf(iocom, 1,
724 				    "%s: head->magic is bad %02x\n",
725 				    iocom->label, head->magic);
726 			if (iocom->flags & DMSG_IOCOMF_CRYPTED)
727 				dmio_printf(iocom, 1, "%s\n",
728 					    "(on encrypted link)");
729 			ioq->error = DMSG_IOQ_ERROR_SYNC;
730 			break;
731 		}
732 
733 		/*
734 		 * Calculate the full header size and aux data size
735 		 */
736 		if (head->magic == DMSG_HDR_MAGIC_REV) {
737 			ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) *
738 				      DMSG_ALIGN;
739 			aux_size = bswap32(head->aux_bytes);
740 		} else {
741 			ioq->hbytes = (head->cmd & DMSGF_SIZE) *
742 				      DMSG_ALIGN;
743 			aux_size = head->aux_bytes;
744 		}
745 		ioq->abytes = DMSG_DOALIGN(aux_size);
746 		ioq->unaligned_aux_size = aux_size;
747 		if (ioq->hbytes < sizeof(msg->any.head) ||
748 		    ioq->hbytes > sizeof(msg->any) ||
749 		    ioq->abytes > DMSG_AUX_MAX) {
750 			ioq->error = DMSG_IOQ_ERROR_FIELD;
751 			break;
752 		}
753 
754 		/*
755 		 * Allocate the message, the next state will fill it in.
756 		 *
757 		 * NOTE: The aux_data buffer will be sized to an aligned
758 		 *	 value and the aligned remainder zero'd for
759 		 *	 convenience.
760 		 *
761 		 * NOTE: Supply dummy state and a degenerate cmd without
762 		 *	 CREATE set.  The message will temporarily be
763 		 *	 associated with state0 until later post-processing.
764 		 */
765 		msg = dmsg_msg_alloc(&iocom->state0, aux_size,
766 				     ioq->hbytes / DMSG_ALIGN,
767 				     NULL, NULL);
768 		ioq->msg = msg;
769 
770 		/*
771 		 * Fall through to the next state.  Make sure that the
772 		 * extended header does not straddle the end of the buffer.
773 		 * We still want to issue larger reads into our buffer,
774 		 * book-keeping is easier if we don't bcopy() yet.
775 		 *
776 		 * Make sure there is enough room for bloated encrypt data.
777 		 */
778 		nmax = dmsg_ioq_makeroom(ioq, ioq->hbytes);
779 		ioq->state = DMSG_MSGQ_STATE_HEADER2;
780 		/* fall through */
781 	case DMSG_MSGQ_STATE_HEADER2:
782 		/*
783 		 * Fill out the extended header.
784 		 */
785 		assert(msg != NULL);
786 		if (bytes < ioq->hbytes) {
787 			assert(nmax > 0);
788 			n = read(iocom->sock_fd,
789 				 ioq->buf + ioq->fifo_end,
790 				 nmax);
791 			if (n <= 0) {
792 				if (n == 0) {
793 					ioq->error = DMSG_IOQ_ERROR_EOF;
794 					break;
795 				}
796 				if (errno != EINTR &&
797 				    errno != EINPROGRESS &&
798 				    errno != EAGAIN) {
799 					ioq->error = DMSG_IOQ_ERROR_SOCK;
800 					break;
801 				}
802 				n = 0;
803 				/* fall through */
804 			}
805 			ioq->fifo_end += (size_t)n;
806 			nmax -= (size_t)n;
807 		}
808 
809 		if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
810 			dmsg_crypto_decrypt(iocom, ioq);
811 		} else {
812 			ioq->fifo_cdx = ioq->fifo_end;
813 			ioq->fifo_cdn = ioq->fifo_end;
814 		}
815 		bytes = ioq->fifo_cdx - ioq->fifo_beg;
816 
817 		/*
818 		 * Insufficient data accumulated (set msg NULL so caller will
819 		 * retry on event).
820 		 */
821 		if (bytes < ioq->hbytes) {
822 			msg = NULL;
823 			break;
824 		}
825 
826 		/*
827 		 * Calculate the extended header, decrypt data received
828 		 * so far.  Handle endian-conversion for the entire extended
829 		 * header.
830 		 */
831 		head = (void *)(ioq->buf + ioq->fifo_beg);
832 
833 		/*
834 		 * Check the CRC.
835 		 */
836 		if (head->magic == DMSG_HDR_MAGIC_REV)
837 			xcrc32 = bswap32(head->hdr_crc);
838 		else
839 			xcrc32 = head->hdr_crc;
840 		head->hdr_crc = 0;
841 		if (dmsg_icrc32(head, ioq->hbytes) != xcrc32) {
842 			ioq->error = DMSG_IOQ_ERROR_XCRC;
843 			dmio_printf(iocom, 1, "BAD-XCRC(%08x,%08x) %s\n",
844 				    xcrc32, dmsg_icrc32(head, ioq->hbytes),
845 				    dmsg_msg_str(msg));
846 			assert(0);
847 			break;
848 		}
849 		head->hdr_crc = xcrc32;
850 
851 		if (head->magic == DMSG_HDR_MAGIC_REV) {
852 			dmsg_bswap_head(head);
853 		}
854 
855 		/*
856 		 * Copy the extended header into the msg and adjust the
857 		 * FIFO.
858 		 */
859 		bcopy(head, &msg->any, ioq->hbytes);
860 
861 		/*
862 		 * We are either done or we fall-through.
863 		 */
864 		if (ioq->abytes == 0) {
865 			ioq->fifo_beg += ioq->hbytes;
866 			break;
867 		}
868 
869 		/*
870 		 * Must adjust bytes (and the state) when falling through.
871 		 * nmax doesn't change.
872 		 */
873 		ioq->fifo_beg += ioq->hbytes;
874 		bytes -= ioq->hbytes;
875 		ioq->state = DMSG_MSGQ_STATE_AUXDATA1;
876 		/* fall through */
877 	case DMSG_MSGQ_STATE_AUXDATA1:
878 		/*
879 		 * Copy the partial or complete [decrypted] payload from
880 		 * remaining bytes in the FIFO in order to optimize the
881 		 * makeroom call in the AUXDATA2 state.  We have to
882 		 * fall-through either way so we can check the crc.
883 		 *
884 		 * msg->aux_size tracks our aux data.
885 		 *
886 		 * (Lets not complicate matters if the data is encrypted,
887 		 *  since the data in-stream is not the same size as the
888 		 *  data decrypted).
889 		 */
890 		if (bytes >= ioq->abytes) {
891 			bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
892 			      ioq->abytes);
893 			msg->aux_size = ioq->abytes;
894 			ioq->fifo_beg += ioq->abytes;
895 			assert(ioq->fifo_beg <= ioq->fifo_cdx);
896 			assert(ioq->fifo_cdx <= ioq->fifo_cdn);
897 			bytes -= ioq->abytes;
898 		} else if (bytes) {
899 			bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
900 			      bytes);
901 			msg->aux_size = bytes;
902 			ioq->fifo_beg += bytes;
903 			if (ioq->fifo_cdx < ioq->fifo_beg)
904 				ioq->fifo_cdx = ioq->fifo_beg;
905 			assert(ioq->fifo_beg <= ioq->fifo_cdx);
906 			assert(ioq->fifo_cdx <= ioq->fifo_cdn);
907 			bytes = 0;
908 		} else {
909 			msg->aux_size = 0;
910 		}
911 		ioq->state = DMSG_MSGQ_STATE_AUXDATA2;
912 		/* fall through */
913 	case DMSG_MSGQ_STATE_AUXDATA2:
914 		/*
915 		 * Make sure there is enough room for more data.
916 		 */
917 		assert(msg);
918 		nmax = dmsg_ioq_makeroom(ioq, ioq->abytes - msg->aux_size);
919 
920 		/*
921 		 * Read and decrypt more of the payload.
922 		 */
923 		if (msg->aux_size < ioq->abytes) {
924 			assert(nmax > 0);
925 			assert(bytes == 0);
926 			n = read(iocom->sock_fd,
927 				 ioq->buf + ioq->fifo_end,
928 				 nmax);
929 			if (n <= 0) {
930 				if (n == 0) {
931 					ioq->error = DMSG_IOQ_ERROR_EOF;
932 					break;
933 				}
934 				if (errno != EINTR &&
935 				    errno != EINPROGRESS &&
936 				    errno != EAGAIN) {
937 					ioq->error = DMSG_IOQ_ERROR_SOCK;
938 					break;
939 				}
940 				n = 0;
941 				/* fall through */
942 			}
943 			ioq->fifo_end += (size_t)n;
944 			nmax -= (size_t)n;
945 		}
946 
947 		if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
948 			dmsg_crypto_decrypt(iocom, ioq);
949 		} else {
950 			ioq->fifo_cdx = ioq->fifo_end;
951 			ioq->fifo_cdn = ioq->fifo_end;
952 		}
953 		bytes = ioq->fifo_cdx - ioq->fifo_beg;
954 
955 		if (bytes > ioq->abytes - msg->aux_size)
956 			bytes = ioq->abytes - msg->aux_size;
957 
958 		if (bytes) {
959 			bcopy(ioq->buf + ioq->fifo_beg,
960 			      msg->aux_data + msg->aux_size,
961 			      bytes);
962 			msg->aux_size += bytes;
963 			ioq->fifo_beg += bytes;
964 		}
965 
966 		/*
967 		 * Insufficient data accumulated (set msg NULL so caller will
968 		 * retry on event).
969 		 *
970 		 * Assert the auxillary data size is correct, then record the
971 		 * original unaligned size from the message header.
972 		 */
973 		if (msg->aux_size < ioq->abytes) {
974 			msg = NULL;
975 			break;
976 		}
977 		assert(msg->aux_size == ioq->abytes);
978 		msg->aux_size = ioq->unaligned_aux_size;
979 
980 		/*
981 		 * Check aux_crc, then we are done.  Note that the crc
982 		 * is calculated over the aligned size, not the actual
983 		 * size.
984 		 */
985 		xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes);
986 		if (xcrc32 != msg->any.head.aux_crc) {
987 			ioq->error = DMSG_IOQ_ERROR_ACRC;
988 			dmio_printf(iocom, 1,
989 				    "iocom: ACRC error %08x vs %08x "
990 				    "msgid %016jx msgcmd %08x auxsize %d\n",
991 				    xcrc32,
992 				    msg->any.head.aux_crc,
993 				    (intmax_t)msg->any.head.msgid,
994 				    msg->any.head.cmd,
995 				    msg->any.head.aux_bytes);
996 			break;
997 		}
998 		break;
999 	case DMSG_MSGQ_STATE_ERROR:
1000 		/*
1001 		 * Continued calls to drain recorded transactions (returning
1002 		 * a LNK_ERROR for each one), before we return the final
1003 		 * LNK_ERROR.
1004 		 */
1005 		assert(msg == NULL);
1006 		break;
1007 	default:
1008 		/*
1009 		 * We don't double-return errors, the caller should not
1010 		 * have called us again after getting an error msg.
1011 		 */
1012 		assert(0);
1013 		break;
1014 	}
1015 
1016 	/*
1017 	 * Check the message sequence.  The iv[] should prevent any
1018 	 * possibility of a replay but we add this check anyway.
1019 	 */
1020 	if (msg && ioq->error == 0) {
1021 		if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
1022 			ioq->error = DMSG_IOQ_ERROR_MSGSEQ;
1023 		} else {
1024 			++ioq->seq;
1025 		}
1026 	}
1027 
1028 	/*
1029 	 * Handle error, RREQ, or completion
1030 	 *
1031 	 * NOTE: nmax and bytes are invalid at this point, we don't bother
1032 	 *	 to update them when breaking out.
1033 	 */
1034 	if (ioq->error) {
1035 skip:
1036 		/*
1037 		 * An unrecoverable error causes all active receive
1038 		 * transactions to be terminated with a LNK_ERROR message.
1039 		 *
1040 		 * Once all active transactions are exhausted we set the
1041 		 * iocom ERROR flag and return a non-transactional LNK_ERROR
1042 		 * message, which should cause master processing loops to
1043 		 * terminate.
1044 		 */
1045 		dmio_printf(iocom, 1, "IOQ ERROR %d\n", ioq->error);
1046 		assert(ioq->msg == msg);
1047 		if (msg) {
1048 			dmsg_msg_free(msg);
1049 			ioq->msg = NULL;
1050 			msg = NULL;
1051 		}
1052 
1053 		/*
1054 		 * No more I/O read processing
1055 		 */
1056 		ioq->state = DMSG_MSGQ_STATE_ERROR;
1057 
1058 		/*
1059 		 * Simulate a remote LNK_ERROR DELETE msg for any open
1060 		 * transactions, ending with a final non-transactional
1061 		 * LNK_ERROR (that the session can detect) when no
1062 		 * transactions remain.
1063 		 *
1064 		 * NOTE: Temporarily supply state0 and a degenerate cmd
1065 		 *	 without CREATE set.  The real state will be
1066 		 *	 assigned in the loop.
1067 		 *
1068 		 * NOTE: We are simulating a received message using our
1069 		 *	 side of the state, so the DMSGF_REV* bits have
1070 		 *	 to be reversed.
1071 		 */
1072 		pthread_mutex_lock(&iocom->mtx);
1073 		dmsg_iocom_drain(iocom);
1074 		dmsg_simulate_failure(&iocom->state0, 0, ioq->error);
1075 		pthread_mutex_unlock(&iocom->mtx);
1076 		if (TAILQ_FIRST(&ioq->msgq))
1077 			goto again;
1078 
1079 #if 0
1080 		/*
1081 		 * For the iocom error case we want to set RWORK to indicate
1082 		 * that more messages might be pending.
1083 		 *
1084 		 * It is possible to return NULL when there is more work to
1085 		 * do because each message has to be DELETEd in both
1086 		 * directions before we continue on with the next (though
1087 		 * this could be optimized).  The transmit direction will
1088 		 * re-set RWORK.
1089 		 */
1090 		if (msg)
1091 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1092 #endif
1093 	} else if (msg == NULL) {
1094 		/*
1095 		 * Insufficient data received to finish building the message,
1096 		 * set RREQ and return NULL.
1097 		 *
1098 		 * Leave ioq->msg intact.
1099 		 * Leave the FIFO intact.
1100 		 */
1101 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1102 	} else {
1103 		/*
1104 		 * Continue processing msg.
1105 		 *
1106 		 * The fifo has already been advanced past the message.
1107 		 * Trivially reset the FIFO indices if possible.
1108 		 *
1109 		 * clear the FIFO if it is now empty and set RREQ to wait
1110 		 * for more from the socket.  If the FIFO is not empty set
1111 		 * TWORK to bypass the poll so we loop immediately.
1112 		 */
1113 		if (ioq->fifo_beg == ioq->fifo_cdx &&
1114 		    ioq->fifo_cdn == ioq->fifo_end) {
1115 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1116 			ioq->fifo_cdx = 0;
1117 			ioq->fifo_cdn = 0;
1118 			ioq->fifo_beg = 0;
1119 			ioq->fifo_end = 0;
1120 		} else {
1121 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1122 		}
1123 		ioq->state = DMSG_MSGQ_STATE_HEADER1;
1124 		ioq->msg = NULL;
1125 
1126 		/*
1127 		 * Handle message routing.  Validates non-zero sources
1128 		 * and routes message.  Error will be 0 if the message is
1129 		 * destined for us.
1130 		 *
1131 		 * State processing only occurs for messages destined for us.
1132 		 */
1133 		dmio_printf(iocom, 5,
1134 			    "rxmsg cmd=%08x circ=%016jx\n",
1135 			    msg->any.head.cmd,
1136 			    (intmax_t)msg->any.head.circuit);
1137 
1138 		error = dmsg_state_msgrx(msg, 0);
1139 
1140 		if (error) {
1141 			/*
1142 			 * Abort-after-closure, throw message away and
1143 			 * start reading another.
1144 			 */
1145 			if (error == DMSG_IOQ_ERROR_EALREADY) {
1146 				dmsg_msg_free(msg);
1147 				goto again;
1148 			}
1149 
1150 			/*
1151 			 * Process real error and throw away message.
1152 			 */
1153 			ioq->error = error;
1154 			goto skip;
1155 		}
1156 
1157 		/*
1158 		 * No error and not routed
1159 		 */
1160 		/* no error, not routed.  Fall through and return msg */
1161 	}
1162 	return (msg);
1163 }
1164 
1165 /*
1166  * Calculate the header and data crc's and write a low-level message to
1167  * the connection.  If aux_crc is non-zero the aux_data crc is already
1168  * assumed to have been set.
1169  *
1170  * A non-NULL msg is added to the queue but not necessarily flushed.
1171  * Calling this function with msg == NULL will get a flush going.
1172  *
1173  * (called from iocom_core only)
1174  */
1175 void
1176 dmsg_iocom_flush1(dmsg_iocom_t *iocom)
1177 {
1178 	dmsg_ioq_t *ioq = &iocom->ioq_tx;
1179 	dmsg_msg_t *msg;
1180 	uint32_t xcrc32;
1181 	size_t hbytes;
1182 	size_t abytes;
1183 	dmsg_msg_queue_t tmpq;
1184 
1185 	atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1186 	TAILQ_INIT(&tmpq);
1187 	pthread_mutex_lock(&iocom->mtx);
1188 	while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
1189 		TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
1190 		TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
1191 	}
1192 	pthread_mutex_unlock(&iocom->mtx);
1193 
1194 	/*
1195 	 * Flush queue, doing all required encryption and CRC generation,
1196 	 * with the mutex unlocked.
1197 	 */
1198 	while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
1199 		/*
1200 		 * Process terminal connection errors.
1201 		 */
1202 		TAILQ_REMOVE(&tmpq, msg, qentry);
1203 		if (ioq->error) {
1204 			TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1205 			++ioq->msgcount;
1206 			continue;
1207 		}
1208 
1209 		/*
1210 		 * Finish populating the msg fields.  The salt ensures that
1211 		 * the iv[] array is ridiculously randomized and we also
1212 		 * re-seed our PRNG every 32768 messages just to be sure.
1213 		 */
1214 		msg->any.head.magic = DMSG_HDR_MAGIC;
1215 		msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
1216 		++ioq->seq;
1217 		if ((ioq->seq & 32767) == 0) {
1218 			pthread_mutex_lock(&iocom->mtx);
1219 			srandomdev();
1220 			pthread_mutex_unlock(&iocom->mtx);
1221 		}
1222 
1223 		/*
1224 		 * Calculate aux_crc if 0, then calculate hdr_crc.
1225 		 */
1226 		if (msg->aux_size && msg->any.head.aux_crc == 0) {
1227 			abytes = DMSG_DOALIGN(msg->aux_size);
1228 			xcrc32 = dmsg_icrc32(msg->aux_data, abytes);
1229 			msg->any.head.aux_crc = xcrc32;
1230 		}
1231 		msg->any.head.aux_bytes = msg->aux_size;
1232 
1233 		hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1234 			 DMSG_ALIGN;
1235 		msg->any.head.hdr_crc = 0;
1236 		msg->any.head.hdr_crc = dmsg_icrc32(&msg->any.head, hbytes);
1237 
1238 		/*
1239 		 * Enqueue the message (the flush codes handles stream
1240 		 * encryption).
1241 		 */
1242 		TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1243 		++ioq->msgcount;
1244 	}
1245 	dmsg_iocom_flush2(iocom);
1246 }
1247 
1248 /*
1249  * Thread localized, iocom->mtx not held by caller.
1250  *
1251  * (called from iocom_core via iocom_flush1 only)
1252  */
1253 void
1254 dmsg_iocom_flush2(dmsg_iocom_t *iocom)
1255 {
1256 	dmsg_ioq_t *ioq = &iocom->ioq_tx;
1257 	dmsg_msg_t *msg;
1258 	ssize_t n;
1259 	struct iovec iov[DMSG_IOQ_MAXIOVEC];
1260 	size_t nact;
1261 	size_t hbytes;
1262 	size_t abytes;
1263 	size_t hoff;
1264 	size_t aoff;
1265 	int iovcnt;
1266 	int save_errno;
1267 
1268 	if (ioq->error) {
1269 		dmsg_iocom_drain(iocom);
1270 		return;
1271 	}
1272 
1273 	/*
1274 	 * Pump messages out the connection by building an iovec.
1275 	 *
1276 	 * ioq->hbytes/ioq->abytes tracks how much of the first message
1277 	 * in the queue has been successfully written out, so we can
1278 	 * resume writing.
1279 	 */
1280 	iovcnt = 0;
1281 	nact = 0;
1282 	hoff = ioq->hbytes;
1283 	aoff = ioq->abytes;
1284 
1285 	TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
1286 		hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1287 			 DMSG_ALIGN;
1288 		abytes = DMSG_DOALIGN(msg->aux_size);
1289 		assert(hoff <= hbytes && aoff <= abytes);
1290 
1291 		if (hoff < hbytes) {
1292 			size_t maxlen = hbytes - hoff;
1293 			if (maxlen > sizeof(ioq->buf) / 2)
1294 				maxlen = sizeof(ioq->buf) / 2;
1295 			iov[iovcnt].iov_base = (char *)&msg->any.head + hoff;
1296 			iov[iovcnt].iov_len = maxlen;
1297 			nact += maxlen;
1298 			++iovcnt;
1299 			if (iovcnt == DMSG_IOQ_MAXIOVEC ||
1300 			    maxlen != hbytes - hoff) {
1301 				break;
1302 			}
1303 		}
1304 		if (aoff < abytes) {
1305 			size_t maxlen = abytes - aoff;
1306 			if (maxlen > sizeof(ioq->buf) / 2)
1307 				maxlen = sizeof(ioq->buf) / 2;
1308 
1309 			assert(msg->aux_data != NULL);
1310 			iov[iovcnt].iov_base = (char *)msg->aux_data + aoff;
1311 			iov[iovcnt].iov_len = maxlen;
1312 			nact += maxlen;
1313 			++iovcnt;
1314 			if (iovcnt == DMSG_IOQ_MAXIOVEC ||
1315 			    maxlen != abytes - aoff) {
1316 				break;
1317 			}
1318 		}
1319 		hoff = 0;
1320 		aoff = 0;
1321 	}
1322 
1323 	/*
1324 	 * Shortcut if no work to do.  Be sure to check for old work still
1325 	 * pending in the FIFO.
1326 	 */
1327 	if (iovcnt == 0 && ioq->fifo_beg == ioq->fifo_cdx)
1328 		return;
1329 
1330 	/*
1331 	 * Encrypt and write the data.  The crypto code will move the
1332 	 * data into the fifo and adjust the iov as necessary.  If
1333 	 * encryption is disabled the iov is left alone.
1334 	 *
1335 	 * May return a smaller iov (thus a smaller n), with aggregated
1336 	 * chunks.  May reduce nmax to what fits in the FIFO.
1337 	 *
1338 	 * This function sets nact to the number of original bytes now
1339 	 * encrypted, adding to the FIFO some number of bytes that might
1340 	 * be greater depending on the crypto mechanic.  iov[] is adjusted
1341 	 * to point at the FIFO if necessary.
1342 	 *
1343 	 * NOTE: nact is the number of bytes eaten from the message.  For
1344 	 *	 encrypted data this is the number of bytes processed for
1345 	 *	 encryption and not necessarily the number of bytes writable.
1346 	 *	 The return value from the writev() is the post-encrypted
1347 	 *	 byte count which might be larger.
1348 	 *
1349 	 * NOTE: For direct writes, nact is the return value from the writev().
1350 	 */
1351 	if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
1352 		/*
1353 		 * Make sure the FIFO has a reasonable amount of space
1354 		 * left (if not completely full).
1355 		 *
1356 		 * In this situation we are staging the encrypted message
1357 		 * data in the FIFO.  (nact) represents how much plaintext
1358 		 * has been staged, (n) represents how much encrypted data
1359 		 * has been flushed.  The two are independent of each other.
1360 		 */
1361 		if (ioq->fifo_beg > sizeof(ioq->buf) / 2 &&
1362 		    sizeof(ioq->buf) - ioq->fifo_end < DMSG_ALIGN * 2) {
1363 			bcopy(ioq->buf + ioq->fifo_beg, ioq->buf,
1364 			      ioq->fifo_end - ioq->fifo_beg);
1365 			ioq->fifo_cdx -= ioq->fifo_beg;
1366 			ioq->fifo_cdn -= ioq->fifo_beg;
1367 			ioq->fifo_end -= ioq->fifo_beg;
1368 			ioq->fifo_beg = 0;
1369 		}
1370 
1371 		/*
1372 		 * beg .... cdx ............ cdn ............. end
1373 		 * [WRITABLE] [PARTIALENCRYPT] [NOTYETENCRYPTED]
1374 		 *
1375 		 * Advance fifo_beg on a successful write.
1376 		 */
1377 		iovcnt = dmsg_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact);
1378 		n = writev(iocom->sock_fd, iov, iovcnt);
1379 		save_errno = errno;
1380 		if (n > 0) {
1381 			ioq->fifo_beg += n;
1382 			if (ioq->fifo_beg == ioq->fifo_end) {
1383 				ioq->fifo_beg = 0;
1384 				ioq->fifo_cdn = 0;
1385 				ioq->fifo_cdx = 0;
1386 				ioq->fifo_end = 0;
1387 			}
1388 		}
1389 
1390 		/*
1391 		 * We don't mess with the nact returned by the crypto_encrypt
1392 		 * call, which represents the filling of the FIFO.  (n) tells
1393 		 * us how much we were able to write from the FIFO.  The two
1394 		 * are different beasts when encrypting.
1395 		 */
1396 	} else {
1397 		/*
1398 		 * In this situation we are not staging the messages to the
1399 		 * FIFO but instead writing them directly from the msg
1400 		 * structure(s) unencrypted, so (nact) is basically (n).
1401 		 */
1402 		n = writev(iocom->sock_fd, iov, iovcnt);
1403 		save_errno = errno;
1404 		if (n > 0)
1405 			nact = n;
1406 		else
1407 			nact = 0;
1408 	}
1409 
1410 	/*
1411 	 * Clean out the transmit queue based on what we successfully
1412 	 * encrypted (nact is the plaintext count) and is now in the FIFO.
1413 	 * ioq->hbytes/abytes represents the portion of the first message
1414 	 * previously sent.
1415 	 */
1416 	while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1417 		hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1418 			 DMSG_ALIGN;
1419 		abytes = DMSG_DOALIGN(msg->aux_size);
1420 
1421 		if ((size_t)nact < hbytes - ioq->hbytes) {
1422 			ioq->hbytes += nact;
1423 			nact = 0;
1424 			break;
1425 		}
1426 		nact -= hbytes - ioq->hbytes;
1427 		ioq->hbytes = hbytes;
1428 		if ((size_t)nact < abytes - ioq->abytes) {
1429 			ioq->abytes += nact;
1430 			nact = 0;
1431 			break;
1432 		}
1433 		nact -= abytes - ioq->abytes;
1434 		/* ioq->abytes = abytes; optimized out */
1435 
1436 		dmio_printf(iocom, 5,
1437 			    "txmsg cmd=%08x circ=%016jx\n",
1438 			    msg->any.head.cmd,
1439 			    (intmax_t)msg->any.head.circuit);
1440 
1441 #ifdef DMSG_BLOCK_DEBUG
1442 		uint32_t tcmd;
1443 
1444 		if (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE)) {
1445 			if ((msg->state->flags & DMSG_STATE_ROOT) == 0) {
1446 				tcmd = (msg->state->icmd & DMSGF_BASECMDMASK) |
1447 					    (msg->any.head.cmd & (DMSGF_CREATE |
1448 								  DMSGF_DELETE |
1449 								  DMSGF_REPLY));
1450 			} else {
1451 				tcmd = 0;
1452 			}
1453 		} else {
1454 			tcmd = msg->any.head.cmd & DMSGF_CMDSWMASK;
1455 		}
1456 
1457 		switch (tcmd) {
1458 		case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE:
1459 		case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE:
1460 			dmio_printf(iocom, 4,
1461 				    "write BIO %-3d %016jx %d@%016jx\n",
1462 				    biocount, msg->any.head.msgid,
1463 				    msg->any.blk_read.bytes,
1464 				    msg->any.blk_read.offset);
1465 			break;
1466 		case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
1467 		case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
1468 			dmio_printf(iocom, 4,
1469 				    "wretr BIO %-3d %016jx %d@%016jx\n",
1470 				    biocount, msg->any.head.msgid,
1471 				    msg->any.blk_read.bytes,
1472 				    msg->any.blk_read.offset);
1473 			break;
1474 		default:
1475 			break;
1476 		}
1477 #endif
1478 
1479 		TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1480 		--ioq->msgcount;
1481 		ioq->hbytes = 0;
1482 		ioq->abytes = 0;
1483 		dmsg_msg_free(msg);
1484 	}
1485 	assert(nact == 0);
1486 
1487 	/*
1488 	 * Process the return value from the write w/regards to blocking.
1489 	 */
1490 	if (n < 0) {
1491 		if (save_errno != EINTR &&
1492 		    save_errno != EINPROGRESS &&
1493 		    save_errno != EAGAIN) {
1494 			/*
1495 			 * Fatal write error
1496 			 */
1497 			ioq->error = DMSG_IOQ_ERROR_SOCK;
1498 			dmsg_iocom_drain(iocom);
1499 		} else {
1500 			/*
1501 			 * Wait for socket buffer space, do not try to
1502 			 * process more packets for transmit until space
1503 			 * is available.
1504 			 */
1505 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
1506 		}
1507 	} else if (TAILQ_FIRST(&ioq->msgq) ||
1508 		   TAILQ_FIRST(&iocom->txmsgq) ||
1509 		   ioq->fifo_beg != ioq->fifo_cdx) {
1510 		/*
1511 		 * If the write succeeded and more messages are pending
1512 		 * in either msgq, or the FIFO WWORK must remain set.
1513 		 */
1514 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK);
1515 	}
1516 	/* else no transmit-side work remains */
1517 
1518 	if (ioq->error) {
1519 		dmsg_iocom_drain(iocom);
1520 	}
1521 }
1522 
1523 /*
1524  * Kill pending msgs on ioq_tx and adjust the flags such that no more
1525  * write events will occur.  We don't kill read msgs because we want
1526  * the caller to pull off our contrived terminal error msg to detect
1527  * the connection failure.
1528  *
1529  * Localized to iocom_core thread, iocom->mtx not held by caller.
1530  */
1531 void
1532 dmsg_iocom_drain(dmsg_iocom_t *iocom)
1533 {
1534 	dmsg_ioq_t *ioq = &iocom->ioq_tx;
1535 	dmsg_msg_t *msg;
1536 
1537 	atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1538 	ioq->hbytes = 0;
1539 	ioq->abytes = 0;
1540 
1541 	while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1542 		TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1543 		--ioq->msgcount;
1544 		dmsg_msg_free(msg);
1545 	}
1546 }
1547 
1548 /*
1549  * Write a message to an iocom, with additional state processing.
1550  */
1551 void
1552 dmsg_msg_write(dmsg_msg_t *msg)
1553 {
1554 	dmsg_iocom_t *iocom = msg->state->iocom;
1555 	dmsg_state_t *state;
1556 	char dummy;
1557 
1558 	pthread_mutex_lock(&iocom->mtx);
1559 	state = msg->state;
1560 
1561 	dmio_printf(iocom, 5,
1562 		    "msgtx: cmd=%08x msgid=%016jx "
1563 		    "state %p(%08x) error=%d\n",
1564 		    msg->any.head.cmd, msg->any.head.msgid,
1565 		    state, (state ? state->icmd : 0),
1566 		    msg->any.head.error);
1567 
1568 
1569 #if 0
1570 	/*
1571 	 * Make sure the parent transaction is still open in the transmit
1572 	 * direction.  If it isn't the message is dead and we have to
1573 	 * potentially simulate a rxmsg terminating the transaction.
1574 	 */
1575 	if ((state->parent->txcmd & DMSGF_DELETE) ||
1576 	    (state->parent->rxcmd & DMSGF_DELETE)) {
1577 		dmio_printf(iocom, 4, "dmsg_msg_write: EARLY TERMINATION\n");
1578 		dmsg_simulate_failure(state, DMSG_ERR_LOSTLINK);
1579 		dmsg_state_cleanuptx(iocom, msg);
1580 		dmsg_msg_free(msg);
1581 		pthread_mutex_unlock(&iocom->mtx);
1582 		return;
1583 	}
1584 #endif
1585 	/*
1586 	 * Process state data into the message as needed, then update the
1587 	 * state based on the message.
1588 	 */
1589 	if ((state->flags & DMSG_STATE_ROOT) == 0) {
1590 		/*
1591 		 * Existing transaction (could be reply).  It is also
1592 		 * possible for this to be the first reply (CREATE is set),
1593 		 * in which case we populate state->txcmd.
1594 		 *
1595 		 * state->txcmd is adjusted to hold the final message cmd,
1596 		 * and we also be sure to set the CREATE bit here.  We did
1597 		 * not set it in dmsg_msg_alloc() because that would have
1598 		 * not been serialized (state could have gotten ripped out
1599 		 * from under the message prior to it being transmitted).
1600 		 */
1601 		if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
1602 		    DMSGF_CREATE) {
1603 			state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1604 			state->icmd = state->txcmd & DMSGF_BASECMDMASK;
1605 			state->flags &= ~DMSG_STATE_NEW;
1606 		}
1607 		msg->any.head.msgid = state->msgid;
1608 
1609 		if (msg->any.head.cmd & DMSGF_CREATE) {
1610 			state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1611 		}
1612 	}
1613 
1614 	/*
1615 	 * Discard messages sent to transactions which are already dead.
1616 	 */
1617 	if (state && (state->txcmd & DMSGF_DELETE)) {
1618 		dmio_printf(iocom, 4,
1619 			    "dmsg_msg_write: drop msg %08x to dead "
1620 			    "circuit state=%p\n",
1621 			    msg->any.head.cmd, state);
1622 		dmsg_msg_free(msg);
1623 		return;
1624 	}
1625 
1626 	/*
1627 	 * Normally we queue the msg for output.  However, if the circuit is
1628 	 * dead or dying we must simulate a failure in the return direction
1629 	 * and throw the message away.  The other end is not expecting any
1630 	 * further messages from us on this state.
1631 	 *
1632 	 * Note that the I/O thread is responsible for generating the CRCs
1633 	 * and encryption.
1634 	 */
1635 	if (state->flags & DMSG_STATE_DYING) {
1636 #if 0
1637 	if ((state->parent->txcmd & DMSGF_DELETE) ||
1638 	    (state->parent->flags & DMSG_STATE_DYING) ||
1639 	    (state->flags & DMSG_STATE_DYING)) {
1640 #endif
1641 		/*
1642 		 * Illegal message, kill state and related sub-state.
1643 		 * Cannot transmit if state is already dying.
1644 		 */
1645 		dmio_printf(iocom, 4,
1646 			    "dmsg_msg_write: Write to dying circuit "
1647 			    "ptxcmd=%08x prxcmd=%08x flags=%08x\n",
1648 			    state->parent->rxcmd,
1649 			    state->parent->txcmd,
1650 			    state->parent->flags);
1651 		dmsg_state_hold(state);
1652 		dmsg_state_cleanuptx(iocom, msg);
1653 		if ((state->flags & DMSG_STATE_ABORTING) == 0) {
1654 			dmsg_simulate_failure(state, 1, DMSG_ERR_LOSTLINK);
1655 		}
1656 		dmsg_state_drop(state);
1657 		dmsg_msg_free(msg);
1658 	} else {
1659 		/*
1660 		 * Queue the message, clean up transmit state prior to queueing
1661 		 * to avoid SMP races.
1662 		 */
1663 		dmio_printf(iocom, 5,
1664 			    "dmsg_msg_write: commit msg state=%p to txkmsgq\n",
1665 			    state);
1666 		dmsg_state_cleanuptx(iocom, msg);
1667 		TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
1668 		dummy = 0;
1669 		write(iocom->wakeupfds[1], &dummy, 1);	/* XXX optimize me */
1670 	}
1671 	pthread_mutex_unlock(&iocom->mtx);
1672 }
1673 
1674 /*
1675  * Remove state from its parent's subq.  This can wind up recursively
1676  * dropping the parent upward.
1677  *
1678  * NOTE: iocom must be locked.
1679  *
1680  * NOTE: Once we drop the parent, our pstate pointer may become invalid.
1681  */
1682 static
1683 void
1684 dmsg_subq_delete(dmsg_state_t *state)
1685 {
1686 	dmsg_state_t *pstate;
1687 
1688 	if (state->flags & DMSG_STATE_SUBINSERTED) {
1689 		pstate = state->parent;
1690 		assert(pstate);
1691 		if (pstate->scan == state)
1692 			pstate->scan = NULL;
1693 		TAILQ_REMOVE(&pstate->subq, state, entry);
1694 		state->flags &= ~DMSG_STATE_SUBINSERTED;
1695 		state->parent = NULL;
1696 		if (TAILQ_EMPTY(&pstate->subq))
1697 			dmsg_state_drop(pstate);/* pstate->subq */
1698 		pstate = NULL;			/* safety */
1699 		dmsg_state_drop(state);         /* pstate->subq */
1700 	} else {
1701 		assert(state->parent == NULL);
1702 	}
1703 }
1704 
1705 /*
1706  * Simulate reception of a transaction DELETE message when the link goes
1707  * bad.  This routine must recurse through state->subq and generate messages
1708  * and callbacks bottom-up.
1709  *
1710  * iocom->mtx must be held by caller.
1711  */
1712 static
1713 void
1714 dmsg_simulate_failure(dmsg_state_t *state, int meto, int error)
1715 {
1716 	dmsg_state_t *substate;
1717 
1718 	dmsg_state_hold(state);
1719 	if (meto)
1720 		dmsg_state_abort(state);
1721 
1722 	/*
1723 	 * Recurse through sub-states.
1724 	 */
1725 again:
1726 	TAILQ_FOREACH(substate, &state->subq, entry) {
1727 		if (substate->flags & DMSG_STATE_ABORTING)
1728 			continue;
1729 		state->scan = substate;
1730 		dmsg_simulate_failure(substate, 1, error);
1731 		if (state->scan != substate)
1732 			goto again;
1733 	}
1734 
1735 	dmsg_state_drop(state);
1736 }
1737 
1738 static
1739 void
1740 dmsg_state_abort(dmsg_state_t *state)
1741 {
1742 	dmsg_iocom_t *iocom;
1743 	dmsg_msg_t *msg;
1744 
1745 	/*
1746 	 * Set ABORTING and DYING, return if already set.  If the state was
1747 	 * just allocated we defer the abort operation until the related
1748 	 * message is processed.
1749 	 */
1750 	if (state->flags & DMSG_STATE_ABORTING)
1751 		return;
1752 	state->flags |= DMSG_STATE_ABORTING;
1753 	dmsg_state_dying(state);
1754 	if (state->flags & DMSG_STATE_NEW) {
1755 		dmio_printf(iocom, 4,
1756 			    "dmsg_state_abort(0): state %p rxcmd %08x "
1757 			    "txcmd %08x flags %08x - in NEW state\n",
1758 			    state, state->rxcmd,
1759 			    state->txcmd, state->flags);
1760 		return;
1761 	}
1762 
1763 	/*
1764 	 * Simulate parent state failure before child states.  Device
1765 	 * drivers need to understand this and flag the situation but might
1766 	 * have asynchronous operations in progress that they cannot stop.
1767 	 * To make things easier, parent states will not actually disappear
1768 	 * until the children are all gone.
1769 	 */
1770 	if ((state->rxcmd & DMSGF_DELETE) == 0) {
1771 		dmio_printf(iocom, 5,
1772 			    "dmsg_state_abort() on state %p\n",
1773 			    state);
1774 		msg = dmsg_msg_alloc_locked(state, 0, DMSG_LNK_ERROR,
1775 					    NULL, NULL);
1776 		if ((state->rxcmd & DMSGF_CREATE) == 0)
1777 			msg->any.head.cmd |= DMSGF_CREATE;
1778 		msg->any.head.cmd |= DMSGF_DELETE |
1779 				     (state->rxcmd & DMSGF_REPLY);
1780 		msg->any.head.cmd ^= (DMSGF_REVTRANS | DMSGF_REVCIRC);
1781 		msg->any.head.error = DMSG_ERR_LOSTLINK;
1782 		msg->any.head.cmd |= DMSGF_ABORT;
1783 
1784 		/*
1785 		 * Issue callback synchronously even though this isn't
1786 		 * the receiver thread.  We need to issue the callback
1787 		 * before removing state from the subq in order to allow
1788 		 * the callback to reply.
1789 		 */
1790 		iocom = state->iocom;
1791 		dmsg_state_msgrx(msg, 1);
1792 		pthread_mutex_unlock(&iocom->mtx);
1793 		iocom->rcvmsg_callback(msg);
1794 		pthread_mutex_lock(&iocom->mtx);
1795 		dmsg_state_cleanuprx(iocom, msg);
1796 #if 0
1797 		TAILQ_INSERT_TAIL(&iocom->ioq_rx.msgq, msg, qentry);
1798 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1799 #endif
1800 	}
1801 }
1802 
1803 
1804 /*
1805  * Recursively sets DMSG_STATE_DYING on state and all sub-states, preventing
1806  * the transmission of any new messages on these states.  This is done
1807  * atomically when parent state is terminating, whereas setting ABORTING is
1808  * not atomic and can leak races.
1809  */
1810 static
1811 void
1812 dmsg_state_dying(dmsg_state_t *state)
1813 {
1814 	dmsg_state_t *scan;
1815 
1816 	if ((state->flags & DMSG_STATE_DYING) == 0) {
1817 		state->flags |= DMSG_STATE_DYING;
1818 		TAILQ_FOREACH(scan, &state->subq, entry)
1819 			dmsg_state_dying(scan);
1820 	}
1821 }
1822 
1823 /*
1824  * This is a shortcut to formulate a reply to msg with a simple error code,
1825  * It can reply to and terminate a transaction, or it can reply to a one-way
1826  * messages.  A DMSG_LNK_ERROR command code is utilized to encode
1827  * the error code (which can be 0).  Not all transactions are terminated
1828  * with DMSG_LNK_ERROR status (the low level only cares about the
1829  * MSGF_DELETE flag), but most are.
1830  *
1831  * Replies to one-way messages are a bit of an oxymoron but the feature
1832  * is used by the debug (DBG) protocol.
1833  *
1834  * The reply contains no extended data.
1835  */
1836 void
1837 dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
1838 {
1839 	dmsg_state_t *state = msg->state;
1840 	dmsg_msg_t *nmsg;
1841 	uint32_t cmd;
1842 
1843 	/*
1844 	 * Reply with a simple error code and terminate the transaction.
1845 	 */
1846 	cmd = DMSG_LNK_ERROR;
1847 
1848 	/*
1849 	 * Check if our direction has even been initiated yet, set CREATE.
1850 	 *
1851 	 * Check what direction this is (command or reply direction).  Note
1852 	 * that txcmd might not have been initiated yet.
1853 	 *
1854 	 * If our direction has already been closed we just return without
1855 	 * doing anything.
1856 	 */
1857 	if ((state->flags & DMSG_STATE_ROOT) == 0) {
1858 		if (state->txcmd & DMSGF_DELETE)
1859 			return;
1860 		if (state->txcmd & DMSGF_REPLY)
1861 			cmd |= DMSGF_REPLY;
1862 		cmd |= DMSGF_DELETE;
1863 	} else {
1864 		if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1865 			cmd |= DMSGF_REPLY;
1866 	}
1867 
1868 	/*
1869 	 * Allocate the message and associate it with the existing state.
1870 	 * We cannot pass DMSGF_CREATE to msg_alloc() because that may
1871 	 * allocate new state.  We have our state already.
1872 	 */
1873 	nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1874 	if ((state->flags & DMSG_STATE_ROOT) == 0) {
1875 		if ((state->txcmd & DMSGF_CREATE) == 0)
1876 			nmsg->any.head.cmd |= DMSGF_CREATE;
1877 	}
1878 	nmsg->any.head.error = error;
1879 
1880 	dmsg_msg_write(nmsg);
1881 }
1882 
1883 /*
1884  * Similar to dmsg_msg_reply() but leave the transaction open.  That is,
1885  * we are generating a streaming reply or an intermediate acknowledgement
1886  * of some sort as part of the higher level protocol, with more to come
1887  * later.
1888  */
1889 void
1890 dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
1891 {
1892 	dmsg_state_t *state = msg->state;
1893 	dmsg_msg_t *nmsg;
1894 	uint32_t cmd;
1895 
1896 
1897 	/*
1898 	 * Reply with a simple error code and terminate the transaction.
1899 	 */
1900 	cmd = DMSG_LNK_ERROR;
1901 
1902 	/*
1903 	 * Check if our direction has even been initiated yet, set CREATE.
1904 	 *
1905 	 * Check what direction this is (command or reply direction).  Note
1906 	 * that txcmd might not have been initiated yet.
1907 	 *
1908 	 * If our direction has already been closed we just return without
1909 	 * doing anything.
1910 	 */
1911 	if ((state->flags & DMSG_STATE_ROOT) == 0) {
1912 		if (state->txcmd & DMSGF_DELETE)
1913 			return;
1914 		if (state->txcmd & DMSGF_REPLY)
1915 			cmd |= DMSGF_REPLY;
1916 		/* continuing transaction, do not set MSGF_DELETE */
1917 	} else {
1918 		if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1919 			cmd |= DMSGF_REPLY;
1920 	}
1921 	nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1922 	if ((state->flags & DMSG_STATE_ROOT) == 0) {
1923 		if ((state->txcmd & DMSGF_CREATE) == 0)
1924 			nmsg->any.head.cmd |= DMSGF_CREATE;
1925 	}
1926 	nmsg->any.head.error = error;
1927 
1928 	dmsg_msg_write(nmsg);
1929 }
1930 
1931 /*
1932  * Terminate a transaction given a state structure by issuing a DELETE.
1933  * (the state structure must not be &iocom->state0)
1934  */
1935 void
1936 dmsg_state_reply(dmsg_state_t *state, uint32_t error)
1937 {
1938 	dmsg_msg_t *nmsg;
1939 	uint32_t cmd = DMSG_LNK_ERROR | DMSGF_DELETE;
1940 
1941 	/*
1942 	 * Nothing to do if we already transmitted a delete
1943 	 */
1944 	if (state->txcmd & DMSGF_DELETE)
1945 		return;
1946 
1947 	/*
1948 	 * Set REPLY if the other end initiated the command.  Otherwise
1949 	 * we are the command direction.
1950 	 */
1951 	if (state->txcmd & DMSGF_REPLY)
1952 		cmd |= DMSGF_REPLY;
1953 
1954 	nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1955 	if ((state->flags & DMSG_STATE_ROOT) == 0) {
1956 		if ((state->txcmd & DMSGF_CREATE) == 0)
1957 			nmsg->any.head.cmd |= DMSGF_CREATE;
1958 	}
1959 	nmsg->any.head.error = error;
1960 	dmsg_msg_write(nmsg);
1961 }
1962 
1963 /*
1964  * Terminate a transaction given a state structure by issuing a DELETE.
1965  * (the state structure must not be &iocom->state0)
1966  */
1967 void
1968 dmsg_state_result(dmsg_state_t *state, uint32_t error)
1969 {
1970 	dmsg_msg_t *nmsg;
1971 	uint32_t cmd = DMSG_LNK_ERROR;
1972 
1973 	/*
1974 	 * Nothing to do if we already transmitted a delete
1975 	 */
1976 	if (state->txcmd & DMSGF_DELETE)
1977 		return;
1978 
1979 	/*
1980 	 * Set REPLY if the other end initiated the command.  Otherwise
1981 	 * we are the command direction.
1982 	 */
1983 	if (state->txcmd & DMSGF_REPLY)
1984 		cmd |= DMSGF_REPLY;
1985 
1986 	nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1987 	if ((state->flags & DMSG_STATE_ROOT) == 0) {
1988 		if ((state->txcmd & DMSGF_CREATE) == 0)
1989 			nmsg->any.head.cmd |= DMSGF_CREATE;
1990 	}
1991 	nmsg->any.head.error = error;
1992 	dmsg_msg_write(nmsg);
1993 }
1994 
1995 /************************************************************************
1996  *			TRANSACTION STATE HANDLING			*
1997  ************************************************************************
1998  *
1999  */
2000 
2001 /*
2002  * Process state tracking for a message after reception, prior to execution.
2003  * Possibly route the message (consuming it).
2004  *
2005  * Called with msglk held and the msg dequeued.
2006  *
2007  * All messages are called with dummy state and return actual state.
2008  * (One-off messages often just return the same dummy state).
2009  *
2010  * May request that caller discard the message by setting *discardp to 1.
2011  * The returned state is not used in this case and is allowed to be NULL.
2012  *
2013  * --
2014  *
2015  * These routines handle persistent and command/reply message state via the
2016  * CREATE and DELETE flags.  The first message in a command or reply sequence
2017  * sets CREATE, the last message in a command or reply sequence sets DELETE.
2018  *
2019  * There can be any number of intermediate messages belonging to the same
2020  * sequence sent inbetween the CREATE message and the DELETE message,
2021  * which set neither flag.  This represents a streaming command or reply.
2022  *
2023  * Any command message received with CREATE set expects a reply sequence to
2024  * be returned.  Reply sequences work the same as command sequences except the
2025  * REPLY bit is also sent.  Both the command side and reply side can
2026  * degenerate into a single message with both CREATE and DELETE set.  Note
2027  * that one side can be streaming and the other side not, or neither, or both.
2028  *
2029  * The msgid is unique for the initiator.  That is, two sides sending a new
2030  * message can use the same msgid without colliding.
2031  *
2032  * --
2033  *
2034  * The message may be running over a circuit.  If the circuit is half-deleted
2035  * The message is typically racing against a link failure and must be thrown
2036  * out.  As the circuit deletion propagates the library will automatically
2037  * generate terminations for sub states.
2038  *
2039  * --
2040  *
2041  * ABORT sequences work by setting the ABORT flag along with normal message
2042  * state.  However, ABORTs can also be sent on half-closed messages, that is
2043  * even if the command or reply side has already sent a DELETE, as long as
2044  * the message has not been fully closed it can still send an ABORT+DELETE
2045  * to terminate the half-closed message state.
2046  *
2047  * Since ABORT+DELETEs can race we silently discard ABORT's for message
2048  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
2049  * also race, and in this situation the other side might have already
2050  * initiated a new unrelated command with the same message id.  Since
2051  * the abort has not set the CREATE flag the situation can be detected
2052  * and the message will also be discarded.
2053  *
2054  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
2055  * The ABORT request is essentially integrated into the command instead
2056  * of being sent later on.  In this situation the command implementation
2057  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
2058  * special-case non-blocking operation for the command.
2059  *
2060  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
2061  *	  to be mid-stream aborts for command/reply sequences.  ABORTs on
2062  *	  one-way messages are not supported.
2063  *
2064  * NOTE!  If a command sequence does not support aborts the ABORT flag is
2065  *	  simply ignored.
2066  *
2067  * --
2068  *
2069  * One-off messages (no reply expected) are sent without an established
2070  * transaction.  CREATE and DELETE are left clear and the msgid is usually 0.
2071  * For one-off messages sent over circuits msgid generally MUST be 0.
2072  *
2073  * One-off messages cannot be aborted and typically aren't processed
2074  * by these routines.  Order is still guaranteed for messages sent over
2075  * the same circuit.  The REPLY bit can be used to distinguish whether
2076  * a one-off message is a command or reply.  For example, one-off replies
2077  * will typically just contain status updates.
2078  */
2079 static int
2080 dmsg_state_msgrx(dmsg_msg_t *msg, int mstate)
2081 {
2082 	dmsg_iocom_t *iocom = msg->state->iocom;
2083 	dmsg_state_t *state;
2084 	dmsg_state_t *pstate;
2085 	dmsg_state_t sdummy;
2086 	int error;
2087 
2088 	pthread_mutex_lock(&iocom->mtx);
2089 
2090 	if (DMsgDebugOpt) {
2091 		dmio_printf(iocom, 5,
2092 			    "msgrx: cmd=%08x msgid=%016jx "
2093 			    "circuit=%016jx error=%d\n",
2094 			    msg->any.head.cmd,
2095 			    msg->any.head.msgid,
2096 			    msg->any.head.circuit,
2097 			    msg->any.head.error);
2098 	}
2099 
2100 	/*
2101 	 * Lookup the circuit (pstate).  The circuit will be an open
2102 	 * transaction.  The REVCIRC bit in the message tells us which side
2103 	 * initiated it.
2104 	 *
2105 	 * If mstate is non-zero the state has already been incorporated
2106 	 * into the message as part of a simulated abort.  Note that in this
2107 	 * situation the parent state may have already been removed from
2108 	 * the RBTREE.
2109 	 */
2110 	if (mstate) {
2111 		pstate = msg->state->parent;
2112 	} else if (msg->any.head.circuit) {
2113 		sdummy.msgid = msg->any.head.circuit;
2114 
2115 		if (msg->any.head.cmd & DMSGF_REVCIRC) {
2116 			pstate = RB_FIND(dmsg_state_tree,
2117 					 &iocom->statewr_tree,
2118 					 &sdummy);
2119 		} else {
2120 			pstate = RB_FIND(dmsg_state_tree,
2121 					 &iocom->staterd_tree,
2122 					 &sdummy);
2123 		}
2124 
2125 		/*
2126 		 * If we cannot find the circuit throw the message away.
2127 		 * The state will have already been taken care of by
2128 		 * the simulated failure code.  This case can occur due
2129 		 * to a failure propagating in one direction crossing a
2130 		 * request on the failed circuit propagating in the other
2131 		 * direction.
2132 		 */
2133 		if (pstate == NULL) {
2134 			dmio_printf(iocom, 4,
2135 				    "missing parent in stacked trans %s\n",
2136 				    dmsg_msg_str(msg));
2137 			pthread_mutex_unlock(&iocom->mtx);
2138 			error = DMSG_IOQ_ERROR_EALREADY;
2139 
2140 			return error;
2141 		}
2142 	} else {
2143 		pstate = &iocom->state0;
2144 	}
2145 	/* WARNING: pstate not (yet) refd */
2146 
2147 	/*
2148 	 * Lookup the msgid.
2149 	 *
2150 	 * If mstate is non-zero the state has already been incorporated
2151 	 * into the message as part of a simulated abort.  Note that in this
2152 	 * situation the state may have already been removed from the RBTREE.
2153 	 *
2154 	 * If received msg is a command state is on staterd_tree.
2155 	 * If received msg is a reply state is on statewr_tree.
2156 	 * Otherwise there is no state (retain &iocom->state0)
2157 	 */
2158 	if (mstate) {
2159 		state = msg->state;
2160 	} else {
2161 		sdummy.msgid = msg->any.head.msgid;
2162 		if (msg->any.head.cmd & DMSGF_REVTRANS) {
2163 			state = RB_FIND(dmsg_state_tree,
2164 					&iocom->statewr_tree, &sdummy);
2165 		} else {
2166 			state = RB_FIND(dmsg_state_tree,
2167 					&iocom->staterd_tree, &sdummy);
2168 		}
2169 	}
2170 
2171 	if (DMsgDebugOpt) {
2172 		dmio_printf(iocom, 5, "msgrx:\tstate %p(%08x)",
2173 			    state, (state ? state->icmd : 0));
2174 		if (pstate != &iocom->state0) {
2175 			dmio_printf(iocom, 5,
2176 				    " pstate %p(%08x)",
2177 				    pstate, pstate->icmd);
2178 		}
2179 		dmio_printf(iocom, 5, "%s\n", "");
2180 	}
2181 
2182 	if (mstate) {
2183 		/* state already assigned to msg */
2184 	} else if (state) {
2185 		/*
2186 		 * Message over an existing transaction (CREATE should not
2187 		 * be set).
2188 		 */
2189 		dmsg_state_drop(msg->state);
2190 		dmsg_state_hold(state);
2191 		msg->state = state;
2192 		assert(pstate == state->parent);
2193 	} else {
2194 		/*
2195 		 * Either a new transaction (if CREATE set) or a one-off.
2196 		 */
2197 		state = pstate;
2198 	}
2199 
2200 	/*
2201 	 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
2202 	 * inside the case statements.
2203 	 *
2204 	 * Construct new state as necessary.
2205 	 */
2206 	switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
2207 				    DMSGF_REPLY)) {
2208 	case DMSGF_CREATE:
2209 	case DMSGF_CREATE | DMSGF_DELETE:
2210 		/*
2211 		 * Create new sub-transaction under pstate.
2212 		 * (any DELETE is handled in post-processing of msg).
2213 		 *
2214 		 * (During routing the msgid was made unique for this
2215 		 * direction over the comlink, so our RB trees can be
2216 		 * iocom-based instead of state-based).
2217 		 */
2218 		if (state != pstate) {
2219 			dmio_printf(iocom, 2,
2220 				    "duplicate transaction %s\n",
2221 				    dmsg_msg_str(msg));
2222 			error = DMSG_IOQ_ERROR_TRANS;
2223 			assert(0);
2224 			break;
2225 		}
2226 
2227 		/*
2228 		 * Allocate the new state.
2229 		 */
2230 		state = malloc(sizeof(*state));
2231 		bzero(state, sizeof(*state));
2232 		atomic_add_int(&dmsg_state_count, 1);
2233 
2234 		TAILQ_INIT(&state->subq);
2235 		dmsg_state_hold(pstate);
2236 		state->parent = pstate;
2237 		state->iocom = iocom;
2238 		state->flags = DMSG_STATE_DYNAMIC |
2239 			       DMSG_STATE_OPPOSITE;
2240 		state->msgid = msg->any.head.msgid;
2241 		state->txcmd = DMSGF_REPLY;
2242 		state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
2243 		state->icmd = state->rxcmd & DMSGF_BASECMDMASK;
2244 		state->flags &= ~DMSG_STATE_NEW;
2245 		msg->state = state;
2246 
2247 		RB_INSERT(dmsg_state_tree, &iocom->staterd_tree, state);
2248 		if (TAILQ_EMPTY(&pstate->subq))
2249 			dmsg_state_hold(pstate);/* pstate->subq */
2250 		TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
2251 		state->flags |= DMSG_STATE_SUBINSERTED |
2252 				DMSG_STATE_RBINSERTED;
2253 		dmsg_state_hold(state);		/* pstate->subq */
2254 		dmsg_state_hold(state);		/* state on rbtree */
2255 		dmsg_state_hold(state);		/* msg->state */
2256 
2257 		/*
2258 		 * If the parent is a relay set up the state handler to
2259 		 * automatically route the message.  Local processing will
2260 		 * not occur if set.
2261 		 *
2262 		 * (state relays are seeded by SPAN processing)
2263 		 */
2264 		if (pstate->relay)
2265 			state->func = dmsg_state_relay;
2266 		error = 0;
2267 		break;
2268 	case DMSGF_DELETE:
2269 		/*
2270 		 * Persistent state is expected but might not exist if an
2271 		 * ABORT+DELETE races the close.
2272 		 *
2273 		 * (any DELETE is handled in post-processing of msg).
2274 		 */
2275 		if (state == pstate) {
2276 			if (msg->any.head.cmd & DMSGF_ABORT) {
2277 				error = DMSG_IOQ_ERROR_EALREADY;
2278 			} else {
2279 				dmio_printf(iocom, 2,
2280 					    "missing-state %s\n",
2281 					    dmsg_msg_str(msg));
2282 				error = DMSG_IOQ_ERROR_TRANS;
2283 				assert(0);
2284 			}
2285 			break;
2286 		}
2287 
2288 		/*
2289 		 * Handle another ABORT+DELETE case if the msgid has already
2290 		 * been reused.
2291 		 */
2292 		if ((state->rxcmd & DMSGF_CREATE) == 0) {
2293 			if (msg->any.head.cmd & DMSGF_ABORT) {
2294 				error = DMSG_IOQ_ERROR_EALREADY;
2295 			} else {
2296 				dmio_printf(iocom, 2,
2297 					    "reused-state %s\n",
2298 					    dmsg_msg_str(msg));
2299 				error = DMSG_IOQ_ERROR_TRANS;
2300 				assert(0);
2301 			}
2302 			break;
2303 		}
2304 		error = 0;
2305 		break;
2306 	default:
2307 		/*
2308 		 * Check for mid-stream ABORT command received, otherwise
2309 		 * allow.
2310 		 */
2311 		if (msg->any.head.cmd & DMSGF_ABORT) {
2312 			if ((state == pstate) ||
2313 			    (state->rxcmd & DMSGF_CREATE) == 0) {
2314 				error = DMSG_IOQ_ERROR_EALREADY;
2315 				break;
2316 			}
2317 		}
2318 		error = 0;
2319 		break;
2320 	case DMSGF_REPLY | DMSGF_CREATE:
2321 	case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
2322 		/*
2323 		 * When receiving a reply with CREATE set the original
2324 		 * persistent state message should already exist.
2325 		 */
2326 		if (state == pstate) {
2327 			dmio_printf(iocom, 2, "no-state(r) %s\n",
2328 				    dmsg_msg_str(msg));
2329 			error = DMSG_IOQ_ERROR_TRANS;
2330 			assert(0);
2331 			break;
2332 		}
2333 		assert(((state->rxcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0);
2334 		state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
2335 		error = 0;
2336 		break;
2337 	case DMSGF_REPLY | DMSGF_DELETE:
2338 		/*
2339 		 * Received REPLY+ABORT+DELETE in case where msgid has
2340 		 * already been fully closed, ignore the message.
2341 		 */
2342 		if (state == pstate) {
2343 			if (msg->any.head.cmd & DMSGF_ABORT) {
2344 				error = DMSG_IOQ_ERROR_EALREADY;
2345 			} else {
2346 				dmio_printf(iocom, 2,
2347 					    "no-state(r,d) %s\n",
2348 					    dmsg_msg_str(msg));
2349 				error = DMSG_IOQ_ERROR_TRANS;
2350 				assert(0);
2351 			}
2352 			break;
2353 		}
2354 
2355 		/*
2356 		 * Received REPLY+ABORT+DELETE in case where msgid has
2357 		 * already been reused for an unrelated message,
2358 		 * ignore the message.
2359 		 */
2360 		if ((state->rxcmd & DMSGF_CREATE) == 0) {
2361 			if (msg->any.head.cmd & DMSGF_ABORT) {
2362 				error = DMSG_IOQ_ERROR_EALREADY;
2363 			} else {
2364 				dmio_printf(iocom, 2,
2365 					    "reused-state(r,d) %s\n",
2366 					    dmsg_msg_str(msg));
2367 				error = DMSG_IOQ_ERROR_TRANS;
2368 				assert(0);
2369 			}
2370 			break;
2371 		}
2372 		error = 0;
2373 		break;
2374 	case DMSGF_REPLY:
2375 		/*
2376 		 * Check for mid-stream ABORT reply received to sent command.
2377 		 */
2378 		if (msg->any.head.cmd & DMSGF_ABORT) {
2379 			if (state == pstate ||
2380 			    (state->rxcmd & DMSGF_CREATE) == 0) {
2381 				error = DMSG_IOQ_ERROR_EALREADY;
2382 				break;
2383 			}
2384 		}
2385 		error = 0;
2386 		break;
2387 	}
2388 
2389 	/*
2390 	 * Calculate the easy-switch() transactional command.  Represents
2391 	 * the outer-transaction command for any transaction-create or
2392 	 * transaction-delete, and the inner message command for any
2393 	 * non-transaction or inside-transaction command.  tcmd will be
2394 	 * set to 0 for any messaging error condition.
2395 	 *
2396 	 * The two can be told apart because outer-transaction commands
2397 	 * always have a DMSGF_CREATE and/or DMSGF_DELETE flag.
2398 	 */
2399 	if (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE)) {
2400 		if ((msg->state->flags & DMSG_STATE_ROOT) == 0) {
2401 			msg->tcmd = (state->icmd & DMSGF_BASECMDMASK) |
2402 				    (msg->any.head.cmd & (DMSGF_CREATE |
2403 							  DMSGF_DELETE |
2404 							  DMSGF_REPLY));
2405 		} else {
2406 			msg->tcmd = 0;
2407 		}
2408 	} else {
2409 		msg->tcmd = msg->any.head.cmd & DMSGF_CMDSWMASK;
2410 	}
2411 
2412 #ifdef DMSG_BLOCK_DEBUG
2413 	switch (msg->tcmd) {
2414 	case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE:
2415 	case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE:
2416 		dmio_printf(iocom, 4,
2417 			    "read  BIO %-3d %016jx %d@%016jx\n",
2418 			    biocount, msg->any.head.msgid,
2419 			    msg->any.blk_read.bytes,
2420 			    msg->any.blk_read.offset);
2421 		break;
2422 	case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
2423 	case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
2424 		dmio_printf(iocom, 4,
2425 			    "rread BIO %-3d %016jx %d@%016jx\n",
2426 			    biocount, msg->any.head.msgid,
2427 			    msg->any.blk_read.bytes,
2428 			    msg->any.blk_read.offset);
2429 		break;
2430 	default:
2431 		break;
2432 	}
2433 #endif
2434 
2435 	/*
2436 	 * Adjust state, mark receive side as DELETED if appropriate and
2437 	 * adjust RB tree if both sides are DELETED.  cleanuprx handles
2438 	 * the rest after the state callback returns.
2439 	 */
2440 	assert(msg->state->iocom == iocom);
2441 	assert(msg->state == state);
2442 
2443 	if (state->flags & DMSG_STATE_ROOT) {
2444 		/*
2445 		 * Nothing to do for non-transactional messages.
2446 		 */
2447 	} else if (msg->any.head.cmd & DMSGF_DELETE) {
2448 		/*
2449 		 * Message terminating transaction, remove the state from
2450 		 * the RB tree if the full transaction is now complete.
2451 		 * The related state, subq, and parent link is retained
2452 		 * until after the state callback is complete.
2453 		 */
2454 		assert((state->rxcmd & DMSGF_DELETE) == 0);
2455 		state->rxcmd |= DMSGF_DELETE;
2456 		if (state->txcmd & DMSGF_DELETE) {
2457 			assert(state->flags & DMSG_STATE_RBINSERTED);
2458 			if (state->rxcmd & DMSGF_REPLY) {
2459 				assert(msg->any.head.cmd & DMSGF_REPLY);
2460 				RB_REMOVE(dmsg_state_tree,
2461 					  &iocom->statewr_tree, state);
2462 			} else {
2463 				assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2464 				RB_REMOVE(dmsg_state_tree,
2465 					  &iocom->staterd_tree, state);
2466 			}
2467 			state->flags &= ~DMSG_STATE_RBINSERTED;
2468 			dmsg_state_drop(state);
2469 		}
2470 	}
2471 
2472 	pthread_mutex_unlock(&iocom->mtx);
2473 
2474 	if (DMsgDebugOpt && error)
2475 		dmio_printf(iocom, 1, "msgrx: error %d\n", error);
2476 
2477 	return (error);
2478 }
2479 
2480 /*
2481  * Route the message and handle pair-state processing.
2482  */
2483 void
2484 dmsg_state_relay(dmsg_msg_t *lmsg)
2485 {
2486 	dmsg_state_t *lpstate;
2487 	dmsg_state_t *rpstate;
2488 	dmsg_state_t *lstate;
2489 	dmsg_state_t *rstate;
2490 	dmsg_msg_t *rmsg;
2491 
2492 #ifdef DMSG_BLOCK_DEBUG
2493 	switch (lmsg->tcmd) {
2494 	case DMSG_BLK_OPEN | DMSGF_CREATE:
2495 		dmio_printf(iocom, 4, "%s\n",
2496 			    "relay BIO_OPEN (CREATE)");
2497 		break;
2498 	case DMSG_BLK_OPEN | DMSGF_DELETE:
2499 		dmio_printf(iocom, 4, "%s\n",
2500 			    "relay BIO_OPEN (DELETE)");
2501 		break;
2502 	case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE:
2503 	case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE:
2504 		atomic_add_int(&biocount, 1);
2505 		dmio_printf(iocom, 4,
2506 			    "relay BIO %-3d %016jx %d@%016jx\n",
2507 			    biocount, lmsg->any.head.msgid,
2508 			    lmsg->any.blk_read.bytes,
2509 			    lmsg->any.blk_read.offset);
2510 		break;
2511 	case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
2512 	case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
2513 		dmio_printf(iocom, 4,
2514 			    "retrn BIO %-3d %016jx %d@%016jx\n",
2515 			    biocount, lmsg->any.head.msgid,
2516 			    lmsg->any.blk_read.bytes,
2517 			    lmsg->any.blk_read.offset);
2518 		atomic_add_int(&biocount, -1);
2519 		break;
2520 	default:
2521 		break;
2522 	}
2523 #endif
2524 
2525 	if ((lmsg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
2526 	    DMSGF_CREATE) {
2527 		/*
2528 		 * New sub-transaction, establish new state and relay.
2529 		 */
2530 		lstate = lmsg->state;
2531 		lpstate = lstate->parent;
2532 		rpstate = lpstate->relay;
2533 		assert(lstate->relay == NULL);
2534 		assert(rpstate != NULL);
2535 
2536 		rmsg = dmsg_msg_alloc(rpstate, 0,
2537 				      lmsg->any.head.cmd,
2538 				      dmsg_state_relay, NULL);
2539 		rstate = rmsg->state;
2540 		rstate->relay = lstate;
2541 		lstate->relay = rstate;
2542 		dmsg_state_hold(lstate);
2543 		dmsg_state_hold(rstate);
2544 	} else {
2545 		/*
2546 		 * State & relay already established
2547 		 */
2548 		lstate = lmsg->state;
2549 		rstate = lstate->relay;
2550 		assert(rstate != NULL);
2551 
2552 		assert((rstate->txcmd & DMSGF_DELETE) == 0);
2553 
2554 #if 0
2555 		if (lstate->flags & DMSG_STATE_ABORTING) {
2556 			dmio_printf(iocom, 4,
2557 				    "relay: relay lost link l=%p r=%p\n",
2558 				    lstate, rstate);
2559 			dmsg_simulate_failure(rstate, 0, DMSG_ERR_LOSTLINK);
2560 		}
2561 #endif
2562 
2563 		rmsg = dmsg_msg_alloc(rstate, 0,
2564 				      lmsg->any.head.cmd,
2565 				      dmsg_state_relay, NULL);
2566 	}
2567 	if (lmsg->hdr_size > sizeof(lmsg->any.head)) {
2568 		bcopy(&lmsg->any.head + 1, &rmsg->any.head + 1,
2569 		      lmsg->hdr_size - sizeof(lmsg->any.head));
2570 	}
2571 	rmsg->any.head.error = lmsg->any.head.error;
2572 	rmsg->any.head.reserved02 = lmsg->any.head.reserved02;
2573 	rmsg->any.head.reserved18 = lmsg->any.head.reserved18;
2574 	rmsg->aux_size = lmsg->aux_size;
2575 	rmsg->aux_data = lmsg->aux_data;
2576 	lmsg->aux_data = NULL;
2577 
2578 	dmsg_msg_write(rmsg);
2579 }
2580 
2581 /*
2582  * Cleanup and retire msg after issuing the state callback.  The state
2583  * has already been removed from the RB tree.  The subq and msg must be
2584  * cleaned up.
2585  *
2586  * Called with the iocom mutex held (to handle subq disconnection).
2587  */
2588 void
2589 dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
2590 {
2591 	dmsg_state_t *state;
2592 
2593 	assert(msg->state->iocom == iocom);
2594 	state = msg->state;
2595 	if (state->flags & DMSG_STATE_ROOT) {
2596 		/*
2597 		 * Free a non-transactional message, there is no state
2598 		 * to worry about.
2599 		 */
2600 		dmsg_msg_free(msg);
2601 	} else if ((state->flags & DMSG_STATE_SUBINSERTED) &&
2602 		   (state->rxcmd & DMSGF_DELETE) &&
2603 		   (state->txcmd & DMSGF_DELETE)) {
2604 		/*
2605 		 * Must disconnect from parent and drop relay.
2606 		 */
2607 		dmsg_subq_delete(state);
2608 		if (state->relay) {
2609 			dmsg_state_drop(state->relay);
2610 			state->relay = NULL;
2611 		}
2612 		dmsg_msg_free(msg);
2613 	} else {
2614 		/*
2615 		 * Message not terminating transaction, leave state intact
2616 		 * and free message if it isn't the CREATE message.
2617 		 */
2618 		dmsg_msg_free(msg);
2619 	}
2620 }
2621 
2622 /*
2623  * Clean up the state after pulling out needed fields and queueing the
2624  * message for transmission.   This occurs in dmsg_msg_write().
2625  *
2626  * Called with the mutex locked.
2627  */
2628 static void
2629 dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
2630 {
2631 	dmsg_state_t *state;
2632 
2633 	assert(iocom == msg->state->iocom);
2634 	state = msg->state;
2635 
2636 	dmsg_state_hold(state);
2637 
2638 	if (state->flags & DMSG_STATE_ROOT) {
2639 		;
2640 	} else if (msg->any.head.cmd & DMSGF_DELETE) {
2641 		/*
2642 		 * Message terminating transaction, destroy the related
2643 		 * state, the original message, and this message (if it
2644 		 * isn't the original message due to a CREATE|DELETE).
2645 		 *
2646 		 * It's possible for governing state to terminate while
2647 		 * sub-transactions still exist.  This is allowed but
2648 		 * will cause sub-transactions to recursively fail.
2649 		 * Further reception of sub-transaction messages will be
2650 		 * impossible because the circuit will no longer exist.
2651 		 * (XXX need code to make sure that happens properly).
2652 		 *
2653 		 * NOTE: It is possible for a fafilure to terminate the
2654 		 *	 state after we have written the message but before
2655 		 *	 we are able to call cleanuptx, so txcmd might already
2656 		 *	 have DMSGF_DELETE set.
2657 		 */
2658 		if ((state->txcmd & DMSGF_DELETE) == 0 &&
2659 		    (state->rxcmd & DMSGF_DELETE)) {
2660 			state->txcmd |= DMSGF_DELETE;
2661 			assert(state->flags & DMSG_STATE_RBINSERTED);
2662 			if (state->txcmd & DMSGF_REPLY) {
2663 				assert(msg->any.head.cmd & DMSGF_REPLY);
2664 				RB_REMOVE(dmsg_state_tree,
2665 					  &iocom->staterd_tree, state);
2666 			} else {
2667 				assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2668 				RB_REMOVE(dmsg_state_tree,
2669 					  &iocom->statewr_tree, state);
2670 			}
2671 			state->flags &= ~DMSG_STATE_RBINSERTED;
2672 			dmsg_subq_delete(state);
2673 
2674 			if (state->relay) {
2675 				dmsg_state_drop(state->relay);
2676 				state->relay = NULL;
2677 			}
2678 			dmsg_state_drop(state);	/* state->rbtree */
2679 		} else if ((state->txcmd & DMSGF_DELETE) == 0) {
2680 			state->txcmd |= DMSGF_DELETE;
2681 		}
2682 	}
2683 
2684 	/*
2685 	 * Deferred abort after transmission.
2686 	 */
2687 	if ((state->flags & (DMSG_STATE_ABORTING | DMSG_STATE_DYING)) &&
2688 	    (state->rxcmd & DMSGF_DELETE) == 0) {
2689 		dmio_printf(iocom, 4,
2690 			    "cleanuptx: state=%p "
2691 			    "executing deferred abort\n",
2692 			    state);
2693 		state->flags &= ~DMSG_STATE_ABORTING;
2694 		dmsg_simulate_failure(state, 1, DMSG_ERR_LOSTLINK);
2695 	}
2696 
2697 	dmsg_state_drop(state);
2698 }
2699 
2700 /*
2701  * Called with or without locks
2702  */
2703 void
2704 dmsg_state_hold(dmsg_state_t *state)
2705 {
2706 	atomic_add_int(&state->refs, 1);
2707 }
2708 
2709 void
2710 dmsg_state_drop(dmsg_state_t *state)
2711 {
2712 	assert(state->refs > 0);
2713 	if (atomic_fetchadd_int(&state->refs, -1) == 1)
2714 		dmsg_state_free(state);
2715 }
2716 
2717 /*
2718  * Called with iocom locked
2719  */
2720 static void
2721 dmsg_state_free(dmsg_state_t *state)
2722 {
2723 	atomic_add_int(&dmsg_state_count, -1);
2724 	dmio_printf(state->iocom, 5, "terminate state %p\n", state);
2725 	assert((state->flags & (DMSG_STATE_ROOT |
2726 				DMSG_STATE_SUBINSERTED |
2727 				DMSG_STATE_RBINSERTED)) == 0);
2728 	assert(TAILQ_EMPTY(&state->subq));
2729 	assert(state->refs == 0);
2730 	if (state->any.any != NULL)   /* XXX avoid deadlock w/exit & kernel */
2731 		closefrom(3);
2732 	assert(state->any.any == NULL);
2733 	free(state);
2734 }
2735 
2736 /*
2737  * This swaps endian for a hammer2_msg_hdr.  Note that the extended
2738  * header is not adjusted, just the core header.
2739  */
2740 void
2741 dmsg_bswap_head(dmsg_hdr_t *head)
2742 {
2743 	head->magic	= bswap16(head->magic);
2744 	head->reserved02 = bswap16(head->reserved02);
2745 	head->salt	= bswap32(head->salt);
2746 
2747 	head->msgid	= bswap64(head->msgid);
2748 	head->circuit	= bswap64(head->circuit);
2749 	head->reserved18= bswap64(head->reserved18);
2750 
2751 	head->cmd	= bswap32(head->cmd);
2752 	head->aux_crc	= bswap32(head->aux_crc);
2753 	head->aux_bytes	= bswap32(head->aux_bytes);
2754 	head->error	= bswap32(head->error);
2755 	head->aux_descr = bswap64(head->aux_descr);
2756 	head->reserved38= bswap32(head->reserved38);
2757 	head->hdr_crc	= bswap32(head->hdr_crc);
2758 }
2759