xref: /dragonfly/lib/libdmsg/msg.c (revision d4ef6694)
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35 
36 #include "dmsg_local.h"
37 
38 int DMsgDebugOpt;
39 int dmsg_state_count;
40 
41 static int dmsg_state_msgrx(dmsg_msg_t *msg);
42 static void dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg);
43 static void dmsg_msg_free_locked(dmsg_msg_t *msg);
44 static void dmsg_state_free(dmsg_state_t *state);
45 static void dmsg_msg_simulate_failure(dmsg_state_t *state, int error);
46 
47 RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
48 
49 /*
50  * STATE TREE - Represents open transactions which are indexed by their
51  *		{ msgid } relative to the governing iocom.
52  */
53 int
54 dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2)
55 {
56 	if (state1->msgid < state2->msgid)
57 		return(-1);
58 	if (state1->msgid > state2->msgid)
59 		return(1);
60 	return(0);
61 }
62 
63 /*
64  * Initialize a low-level ioq
65  */
66 void
67 dmsg_ioq_init(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
68 {
69 	bzero(ioq, sizeof(*ioq));
70 	ioq->state = DMSG_MSGQ_STATE_HEADER1;
71 	TAILQ_INIT(&ioq->msgq);
72 }
73 
74 /*
75  * Cleanup queue.
76  *
77  * caller holds iocom->mtx.
78  */
79 void
80 dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
81 {
82 	dmsg_msg_t *msg;
83 
84 	while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
85 		assert(0);	/* shouldn't happen */
86 		TAILQ_REMOVE(&ioq->msgq, msg, qentry);
87 		dmsg_msg_free(msg);
88 	}
89 	if ((msg = ioq->msg) != NULL) {
90 		ioq->msg = NULL;
91 		dmsg_msg_free(msg);
92 	}
93 }
94 
95 /*
96  * Initialize a low-level communications channel.
97  *
98  * NOTE: The signal_func() is called at least once from the loop and can be
99  *	 re-armed via dmsg_iocom_restate().
100  */
101 void
102 dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
103 		   void (*signal_func)(dmsg_iocom_t *iocom),
104 		   void (*rcvmsg_func)(dmsg_msg_t *msg),
105 		   void (*usrmsg_func)(dmsg_msg_t *msg, int unmanaged),
106 		   void (*altmsg_func)(dmsg_iocom_t *iocom))
107 {
108 	struct stat st;
109 
110 	bzero(iocom, sizeof(*iocom));
111 
112 	asprintf(&iocom->label, "iocom-%p", iocom);
113 	iocom->signal_callback = signal_func;
114 	iocom->rcvmsg_callback = rcvmsg_func;
115 	iocom->altmsg_callback = altmsg_func;
116 	iocom->usrmsg_callback = usrmsg_func;
117 
118 	pthread_mutex_init(&iocom->mtx, NULL);
119 	RB_INIT(&iocom->staterd_tree);
120 	RB_INIT(&iocom->statewr_tree);
121 	TAILQ_INIT(&iocom->freeq);
122 	TAILQ_INIT(&iocom->freeq_aux);
123 	TAILQ_INIT(&iocom->txmsgq);
124 	iocom->sock_fd = sock_fd;
125 	iocom->alt_fd = alt_fd;
126 	iocom->flags = DMSG_IOCOMF_RREQ | DMSG_IOCOMF_CLOSEALT;
127 	if (signal_func)
128 		iocom->flags |= DMSG_IOCOMF_SWORK;
129 	dmsg_ioq_init(iocom, &iocom->ioq_rx);
130 	dmsg_ioq_init(iocom, &iocom->ioq_tx);
131 	iocom->state0.refs = 1;		/* should never trigger a free */
132 	iocom->state0.iocom = iocom;
133 	iocom->state0.parent = &iocom->state0;
134 	iocom->state0.flags = DMSG_STATE_ROOT;
135 	TAILQ_INIT(&iocom->state0.subq);
136 
137 	if (pipe(iocom->wakeupfds) < 0)
138 		assert(0);
139 	fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
140 	fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
141 
142 	/*
143 	 * Negotiate session crypto synchronously.  This will mark the
144 	 * connection as error'd if it fails.  If this is a pipe it's
145 	 * a linkage that we set up ourselves to the filesystem and there
146 	 * is no crypto.
147 	 */
148 	if (fstat(sock_fd, &st) < 0)
149 		assert(0);
150 	if (S_ISSOCK(st.st_mode))
151 		dmsg_crypto_negotiate(iocom);
152 
153 	/*
154 	 * Make sure our fds are set to non-blocking for the iocom core.
155 	 */
156 	if (sock_fd >= 0)
157 		fcntl(sock_fd, F_SETFL, O_NONBLOCK);
158 #if 0
159 	/* if line buffered our single fgets() should be fine */
160 	if (alt_fd >= 0)
161 		fcntl(alt_fd, F_SETFL, O_NONBLOCK);
162 #endif
163 }
164 
165 void
166 dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...)
167 {
168 	va_list va;
169 	char *optr;
170 
171 	va_start(va, ctl);
172 	optr = iocom->label;
173 	vasprintf(&iocom->label, ctl, va);
174 	va_end(va);
175 	if (optr)
176 		free(optr);
177 }
178 
179 /*
180  * May only be called from a callback from iocom_core.
181  *
182  * Adjust state machine functions, set flags to guarantee that both
183  * the recevmsg_func and the sendmsg_func is called at least once.
184  */
185 void
186 dmsg_iocom_restate(dmsg_iocom_t *iocom,
187 		   void (*signal_func)(dmsg_iocom_t *),
188 		   void (*rcvmsg_func)(dmsg_msg_t *msg))
189 {
190 	pthread_mutex_lock(&iocom->mtx);
191 	iocom->signal_callback = signal_func;
192 	iocom->rcvmsg_callback = rcvmsg_func;
193 	if (signal_func)
194 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
195 	else
196 		atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
197 	pthread_mutex_unlock(&iocom->mtx);
198 }
199 
200 void
201 dmsg_iocom_signal(dmsg_iocom_t *iocom)
202 {
203 	pthread_mutex_lock(&iocom->mtx);
204 	if (iocom->signal_callback)
205 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
206 	pthread_mutex_unlock(&iocom->mtx);
207 }
208 
209 /*
210  * Cleanup a terminating iocom.
211  *
212  * Caller should not hold iocom->mtx.  The iocom has already been disconnected
213  * from all possible references to it.
214  */
215 void
216 dmsg_iocom_done(dmsg_iocom_t *iocom)
217 {
218 	dmsg_msg_t *msg;
219 
220 	if (iocom->sock_fd >= 0) {
221 		close(iocom->sock_fd);
222 		iocom->sock_fd = -1;
223 	}
224 	if (iocom->alt_fd >= 0 && (iocom->flags & DMSG_IOCOMF_CLOSEALT)) {
225 		close(iocom->alt_fd);
226 		iocom->alt_fd = -1;
227 	}
228 	dmsg_ioq_done(iocom, &iocom->ioq_rx);
229 	dmsg_ioq_done(iocom, &iocom->ioq_tx);
230 	while ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
231 		TAILQ_REMOVE(&iocom->freeq, msg, qentry);
232 		free(msg);
233 	}
234 	while ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
235 		TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
236 		free(msg->aux_data);
237 		msg->aux_data = NULL;
238 		free(msg);
239 	}
240 	if (iocom->wakeupfds[0] >= 0) {
241 		close(iocom->wakeupfds[0]);
242 		iocom->wakeupfds[0] = -1;
243 	}
244 	if (iocom->wakeupfds[1] >= 0) {
245 		close(iocom->wakeupfds[1]);
246 		iocom->wakeupfds[1] = -1;
247 	}
248 	pthread_mutex_destroy(&iocom->mtx);
249 }
250 
251 /*
252  * Allocate a new message using the specified transaction state.
253  *
254  * If CREATE is set a new transaction is allocated relative to the passed-in
255  * transaction (the 'state' argument becomes pstate).
256  *
257  * If CREATE is not set the message is associated with the passed-in
258  * transaction.
259  */
260 dmsg_msg_t *
261 dmsg_msg_alloc(dmsg_state_t *state,
262 	       size_t aux_size, uint32_t cmd,
263 	       void (*func)(dmsg_msg_t *), void *data)
264 {
265 	dmsg_iocom_t *iocom = state->iocom;
266 	dmsg_msg_t *msg;
267 
268 	pthread_mutex_lock(&iocom->mtx);
269 	msg = dmsg_msg_alloc_locked(state, aux_size, cmd, func, data);
270 	pthread_mutex_unlock(&iocom->mtx);
271 
272 	return msg;
273 }
274 
275 dmsg_msg_t *
276 dmsg_msg_alloc_locked(dmsg_state_t *state,
277 	       size_t aux_size, uint32_t cmd,
278 	       void (*func)(dmsg_msg_t *), void *data)
279 {
280 	dmsg_iocom_t *iocom = state->iocom;
281 	dmsg_state_t *pstate;
282 	dmsg_msg_t *msg;
283 	int hbytes;
284 	size_t aligned_size;
285 
286 #if 0
287 	if (aux_size) {
288 		aligned_size = DMSG_DOALIGN(aux_size);
289 		if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
290 			TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
291 	} else {
292 		aligned_size = 0;
293 		if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
294 			TAILQ_REMOVE(&iocom->freeq, msg, qentry);
295 	}
296 #endif
297 	aligned_size = DMSG_DOALIGN(aux_size);
298 	msg = NULL;
299 	if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
300 		/*
301 		 * When CREATE is set without REPLY the caller is
302 		 * initiating a new transaction stacked under the specified
303 		 * circuit.
304 		 *
305 		 * NOTE: CREATE in txcmd handled by dmsg_msg_write()
306 		 * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx()
307 		 */
308 		pstate = state;
309 		state = malloc(sizeof(*state));
310 		atomic_add_int(&dmsg_state_count, 1);
311 		bzero(state, sizeof(*state));
312 		TAILQ_INIT(&state->subq);
313 		dmsg_state_hold(pstate);
314 		state->refs = 1;
315 		state->parent = pstate;
316 		state->iocom = iocom;
317 		state->flags = DMSG_STATE_DYNAMIC;
318 		state->msgid = (uint64_t)(uintptr_t)state;
319 		state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE);
320 		state->rxcmd = DMSGF_REPLY;
321 		state->icmd = state->txcmd & DMSGF_BASECMDMASK;
322 		state->func = func;
323 		state->any.any = data;
324 
325 		RB_INSERT(dmsg_state_tree, &iocom->statewr_tree, state);
326 		TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
327 		state->flags |= DMSG_STATE_INSERTED;
328 	} else {
329 		/*
330 		 * Otherwise the message is transmitted over the existing
331 		 * open transaction.
332 		 */
333 		pstate = state->parent;
334 	}
335 
336 	/* XXX SMP race for state */
337 	hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
338 	if (msg == NULL) {
339 		msg = malloc(offsetof(struct dmsg_msg, any.head) + hbytes + 4);
340 		bzero(msg, offsetof(struct dmsg_msg, any.head));
341 		*(int *)((char *)msg +
342 			 offsetof(struct dmsg_msg, any.head) + hbytes) =
343 				 0x71B2C3D4;
344 #if 0
345 		msg = malloc(sizeof(*msg));
346 		bzero(msg, sizeof(*msg));
347 #endif
348 	}
349 
350 	/*
351 	 * [re]allocate the auxillary data buffer.  The caller knows that
352 	 * a size-aligned buffer will be allocated but we do not want to
353 	 * force the caller to zero any tail piece, so we do that ourself.
354 	 */
355 	if (msg->aux_size != aux_size) {
356 		if (msg->aux_data) {
357 			free(msg->aux_data);
358 			msg->aux_data = NULL;
359 			msg->aux_size = 0;
360 		}
361 		if (aux_size) {
362 			msg->aux_data = malloc(aligned_size);
363 			msg->aux_size = aux_size;
364 			if (aux_size != aligned_size) {
365 				bzero(msg->aux_data + aux_size,
366 				      aligned_size - aux_size);
367 			}
368 		}
369 	}
370 
371 	/*
372 	 * Set REVTRANS if the transaction was remotely initiated
373 	 * Set REVCIRC if the circuit was remotely initiated
374 	 */
375 	if (state->flags & DMSG_STATE_OPPOSITE)
376 		cmd |= DMSGF_REVTRANS;
377 	if (pstate->flags & DMSG_STATE_OPPOSITE)
378 		cmd |= DMSGF_REVCIRC;
379 
380 	/*
381 	 * Finish filling out the header.
382 	 */
383 	if (hbytes)
384 		bzero(&msg->any.head, hbytes);
385 	msg->hdr_size = hbytes;
386 	msg->any.head.magic = DMSG_HDR_MAGIC;
387 	msg->any.head.cmd = cmd;
388 	msg->any.head.aux_descr = 0;
389 	msg->any.head.aux_crc = 0;
390 	msg->any.head.msgid = state->msgid;
391 	msg->any.head.circuit = pstate->msgid;
392 	msg->state = state;
393 
394 	return (msg);
395 }
396 
397 /*
398  * Free a message so it can be reused afresh.
399  *
400  * NOTE: aux_size can be 0 with a non-NULL aux_data.
401  */
402 static
403 void
404 dmsg_msg_free_locked(dmsg_msg_t *msg)
405 {
406 	/*dmsg_iocom_t *iocom = msg->iocom;*/
407 #if 1
408 	int hbytes = (msg->any.head.cmd & DMSGF_SIZE) * DMSG_ALIGN;
409 	if (*(int *)((char *)msg +
410 		     offsetof(struct  dmsg_msg, any.head) + hbytes) !=
411 	     0x71B2C3D4) {
412 		fprintf(stderr, "MSGFREE FAILED CMD %08x\n", msg->any.head.cmd);
413 		assert(0);
414 	}
415 #endif
416 	msg->state = NULL;	/* safety */
417 	if (msg->aux_data) {
418 		free(msg->aux_data);
419 		msg->aux_data = NULL;
420 	}
421 	msg->aux_size = 0;
422 	free (msg);
423 #if 0
424 	if (msg->aux_data)
425 		TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
426 	else
427 		TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
428 #endif
429 }
430 
431 void
432 dmsg_msg_free(dmsg_msg_t *msg)
433 {
434 	dmsg_iocom_t *iocom = msg->state->iocom;
435 
436 	pthread_mutex_lock(&iocom->mtx);
437 	dmsg_msg_free_locked(msg);
438 	pthread_mutex_unlock(&iocom->mtx);
439 }
440 
441 /*
442  * I/O core loop for an iocom.
443  *
444  * Thread localized, iocom->mtx not held.
445  */
446 void
447 dmsg_iocom_core(dmsg_iocom_t *iocom)
448 {
449 	struct pollfd fds[3];
450 	char dummybuf[256];
451 	dmsg_msg_t *msg;
452 	int timeout;
453 	int count;
454 	int wi;	/* wakeup pipe */
455 	int si;	/* socket */
456 	int ai;	/* alt bulk path socket */
457 
458 	while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) {
459 		/*
460 		 * These iocom->flags are only manipulated within the
461 		 * context of the current thread.  However, modifications
462 		 * still require atomic ops.
463 		 */
464 		if ((iocom->flags & (DMSG_IOCOMF_RWORK |
465 				     DMSG_IOCOMF_WWORK |
466 				     DMSG_IOCOMF_PWORK |
467 				     DMSG_IOCOMF_SWORK |
468 				     DMSG_IOCOMF_ARWORK |
469 				     DMSG_IOCOMF_AWWORK)) == 0) {
470 			/*
471 			 * Only poll if no immediate work is pending.
472 			 * Otherwise we are just wasting our time calling
473 			 * poll.
474 			 */
475 			timeout = 5000;
476 
477 			count = 0;
478 			wi = -1;
479 			si = -1;
480 			ai = -1;
481 
482 			/*
483 			 * Always check the inter-thread pipe, e.g.
484 			 * for iocom->txmsgq work.
485 			 */
486 			wi = count++;
487 			fds[wi].fd = iocom->wakeupfds[0];
488 			fds[wi].events = POLLIN;
489 			fds[wi].revents = 0;
490 
491 			/*
492 			 * Check the socket input/output direction as
493 			 * requested
494 			 */
495 			if (iocom->flags & (DMSG_IOCOMF_RREQ |
496 					    DMSG_IOCOMF_WREQ)) {
497 				si = count++;
498 				fds[si].fd = iocom->sock_fd;
499 				fds[si].events = 0;
500 				fds[si].revents = 0;
501 
502 				if (iocom->flags & DMSG_IOCOMF_RREQ)
503 					fds[si].events |= POLLIN;
504 				if (iocom->flags & DMSG_IOCOMF_WREQ)
505 					fds[si].events |= POLLOUT;
506 			}
507 
508 			/*
509 			 * Check the alternative fd for work.
510 			 */
511 			if (iocom->alt_fd >= 0) {
512 				ai = count++;
513 				fds[ai].fd = iocom->alt_fd;
514 				fds[ai].events = POLLIN;
515 				fds[ai].revents = 0;
516 			}
517 			poll(fds, count, timeout);
518 
519 			if (wi >= 0 && (fds[wi].revents & POLLIN))
520 				atomic_set_int(&iocom->flags,
521 					       DMSG_IOCOMF_PWORK);
522 			if (si >= 0 && (fds[si].revents & POLLIN))
523 				atomic_set_int(&iocom->flags,
524 					       DMSG_IOCOMF_RWORK);
525 			if (si >= 0 && (fds[si].revents & POLLOUT))
526 				atomic_set_int(&iocom->flags,
527 					       DMSG_IOCOMF_WWORK);
528 			if (wi >= 0 && (fds[wi].revents & POLLOUT))
529 				atomic_set_int(&iocom->flags,
530 					       DMSG_IOCOMF_WWORK);
531 			if (ai >= 0 && (fds[ai].revents & POLLIN))
532 				atomic_set_int(&iocom->flags,
533 					       DMSG_IOCOMF_ARWORK);
534 		} else {
535 			/*
536 			 * Always check the pipe
537 			 */
538 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_PWORK);
539 		}
540 
541 		if (iocom->flags & DMSG_IOCOMF_SWORK) {
542 			atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
543 			iocom->signal_callback(iocom);
544 		}
545 
546 		/*
547 		 * Pending message queues from other threads wake us up
548 		 * with a write to the wakeupfds[] pipe.  We have to clear
549 		 * the pipe with a dummy read.
550 		 */
551 		if (iocom->flags & DMSG_IOCOMF_PWORK) {
552 			atomic_clear_int(&iocom->flags, DMSG_IOCOMF_PWORK);
553 			read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
554 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
555 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK);
556 			if (TAILQ_FIRST(&iocom->txmsgq))
557 				dmsg_iocom_flush1(iocom);
558 		}
559 
560 		/*
561 		 * Message write sequencing
562 		 */
563 		if (iocom->flags & DMSG_IOCOMF_WWORK)
564 			dmsg_iocom_flush1(iocom);
565 
566 		/*
567 		 * Message read sequencing.  Run this after the write
568 		 * sequencing in case the write sequencing allowed another
569 		 * auto-DELETE to occur on the read side.
570 		 */
571 		if (iocom->flags & DMSG_IOCOMF_RWORK) {
572 			while ((iocom->flags & DMSG_IOCOMF_EOF) == 0 &&
573 			       (msg = dmsg_ioq_read(iocom)) != NULL) {
574 				if (DMsgDebugOpt) {
575 					fprintf(stderr, "receive %s\n",
576 						dmsg_msg_str(msg));
577 				}
578 				iocom->rcvmsg_callback(msg);
579 				dmsg_state_cleanuprx(iocom, msg);
580 			}
581 		}
582 
583 		if (iocom->flags & DMSG_IOCOMF_ARWORK) {
584 			atomic_clear_int(&iocom->flags, DMSG_IOCOMF_ARWORK);
585 			iocom->altmsg_callback(iocom);
586 		}
587 	}
588 }
589 
590 /*
591  * Make sure there's enough room in the FIFO to hold the
592  * needed data.
593  *
594  * Assume worst case encrypted form is 2x the size of the
595  * plaintext equivalent.
596  */
597 static
598 size_t
599 dmsg_ioq_makeroom(dmsg_ioq_t *ioq, size_t needed)
600 {
601 	size_t bytes;
602 	size_t nmax;
603 
604 	bytes = ioq->fifo_cdx - ioq->fifo_beg;
605 	nmax = sizeof(ioq->buf) - ioq->fifo_end;
606 	if (bytes + nmax / 2 < needed) {
607 		if (bytes) {
608 			bcopy(ioq->buf + ioq->fifo_beg,
609 			      ioq->buf,
610 			      bytes);
611 		}
612 		ioq->fifo_cdx -= ioq->fifo_beg;
613 		ioq->fifo_beg = 0;
614 		if (ioq->fifo_cdn < ioq->fifo_end) {
615 			bcopy(ioq->buf + ioq->fifo_cdn,
616 			      ioq->buf + ioq->fifo_cdx,
617 			      ioq->fifo_end - ioq->fifo_cdn);
618 		}
619 		ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx;
620 		ioq->fifo_cdn = ioq->fifo_cdx;
621 		nmax = sizeof(ioq->buf) - ioq->fifo_end;
622 	}
623 	return(nmax);
624 }
625 
626 /*
627  * Read the next ready message from the ioq, issuing I/O if needed.
628  * Caller should retry on a read-event when NULL is returned.
629  *
630  * If an error occurs during reception a DMSG_LNK_ERROR msg will
631  * be returned for each open transaction, then the ioq and iocom
632  * will be errored out and a non-transactional DMSG_LNK_ERROR
633  * msg will be returned as the final message.  The caller should not call
634  * us again after the final message is returned.
635  *
636  * Thread localized, iocom->mtx not held.
637  */
638 dmsg_msg_t *
639 dmsg_ioq_read(dmsg_iocom_t *iocom)
640 {
641 	dmsg_ioq_t *ioq = &iocom->ioq_rx;
642 	dmsg_msg_t *msg;
643 	dmsg_state_t *state;
644 	dmsg_hdr_t *head;
645 	ssize_t n;
646 	size_t bytes;
647 	size_t nmax;
648 	uint32_t aux_size;
649 	uint32_t xcrc32;
650 	int error;
651 
652 again:
653 	/*
654 	 * If a message is already pending we can just remove and
655 	 * return it.  Message state has already been processed.
656 	 * (currently not implemented)
657 	 */
658 	if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
659 		TAILQ_REMOVE(&ioq->msgq, msg, qentry);
660 		return (msg);
661 	}
662 	atomic_clear_int(&iocom->flags, DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK);
663 
664 	/*
665 	 * If the stream is errored out we stop processing it.
666 	 */
667 	if (ioq->error)
668 		goto skip;
669 
670 	/*
671 	 * Message read in-progress (msg is NULL at the moment).  We don't
672 	 * allocate a msg until we have its core header.
673 	 */
674 	nmax = sizeof(ioq->buf) - ioq->fifo_end;
675 	bytes = ioq->fifo_cdx - ioq->fifo_beg;		/* already decrypted */
676 	msg = ioq->msg;
677 
678 	switch(ioq->state) {
679 	case DMSG_MSGQ_STATE_HEADER1:
680 		/*
681 		 * Load the primary header, fail on any non-trivial read
682 		 * error or on EOF.  Since the primary header is the same
683 		 * size is the message alignment it will never straddle
684 		 * the end of the buffer.
685 		 */
686 		nmax = dmsg_ioq_makeroom(ioq, sizeof(msg->any.head));
687 		if (bytes < sizeof(msg->any.head)) {
688 			n = read(iocom->sock_fd,
689 				 ioq->buf + ioq->fifo_end,
690 				 nmax);
691 			if (n <= 0) {
692 				if (n == 0) {
693 					ioq->error = DMSG_IOQ_ERROR_EOF;
694 					break;
695 				}
696 				if (errno != EINTR &&
697 				    errno != EINPROGRESS &&
698 				    errno != EAGAIN) {
699 					ioq->error = DMSG_IOQ_ERROR_SOCK;
700 					break;
701 				}
702 				n = 0;
703 				/* fall through */
704 			}
705 			ioq->fifo_end += (size_t)n;
706 			nmax -= (size_t)n;
707 		}
708 
709 		/*
710 		 * Decrypt data received so far.  Data will be decrypted
711 		 * in-place but might create gaps in the FIFO.  Partial
712 		 * blocks are not immediately decrypted.
713 		 *
714 		 * WARNING!  The header might be in the wrong endian, we
715 		 *	     do not fix it up until we get the entire
716 		 *	     extended header.
717 		 */
718 		if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
719 			dmsg_crypto_decrypt(iocom, ioq);
720 		} else {
721 			ioq->fifo_cdx = ioq->fifo_end;
722 			ioq->fifo_cdn = ioq->fifo_end;
723 		}
724 		bytes = ioq->fifo_cdx - ioq->fifo_beg;
725 
726 		/*
727 		 * Insufficient data accumulated (msg is NULL, caller will
728 		 * retry on event).
729 		 */
730 		assert(msg == NULL);
731 		if (bytes < sizeof(msg->any.head))
732 			break;
733 
734 		/*
735 		 * Check and fixup the core header.  Note that the icrc
736 		 * has to be calculated before any fixups, but the crc
737 		 * fields in the msg may have to be swapped like everything
738 		 * else.
739 		 */
740 		head = (void *)(ioq->buf + ioq->fifo_beg);
741 		if (head->magic != DMSG_HDR_MAGIC &&
742 		    head->magic != DMSG_HDR_MAGIC_REV) {
743 			fprintf(stderr, "%s: head->magic is bad %02x\n",
744 				iocom->label, head->magic);
745 			if (iocom->flags & DMSG_IOCOMF_CRYPTED)
746 				fprintf(stderr, "(on encrypted link)\n");
747 			ioq->error = DMSG_IOQ_ERROR_SYNC;
748 			break;
749 		}
750 
751 		/*
752 		 * Calculate the full header size and aux data size
753 		 */
754 		if (head->magic == DMSG_HDR_MAGIC_REV) {
755 			ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) *
756 				      DMSG_ALIGN;
757 			aux_size = bswap32(head->aux_bytes);
758 		} else {
759 			ioq->hbytes = (head->cmd & DMSGF_SIZE) *
760 				      DMSG_ALIGN;
761 			aux_size = head->aux_bytes;
762 		}
763 		ioq->abytes = DMSG_DOALIGN(aux_size);
764 		ioq->unaligned_aux_size = aux_size;
765 		if (ioq->hbytes < sizeof(msg->any.head) ||
766 		    ioq->hbytes > sizeof(msg->any) ||
767 		    ioq->abytes > DMSG_AUX_MAX) {
768 			ioq->error = DMSG_IOQ_ERROR_FIELD;
769 			break;
770 		}
771 
772 		/*
773 		 * Allocate the message, the next state will fill it in.
774 		 *
775 		 * NOTE: The aux_data buffer will be sized to an aligned
776 		 *	 value and the aligned remainder zero'd for
777 		 *	 convenience.
778 		 *
779 		 * NOTE: Supply dummy state and a degenerate cmd without
780 		 *	 CREATE set.  The message will temporarily be
781 		 *	 associated with state0 until later post-processing.
782 		 */
783 		msg = dmsg_msg_alloc(&iocom->state0, aux_size,
784 				     ioq->hbytes / DMSG_ALIGN,
785 				     NULL, NULL);
786 		ioq->msg = msg;
787 
788 		/*
789 		 * Fall through to the next state.  Make sure that the
790 		 * extended header does not straddle the end of the buffer.
791 		 * We still want to issue larger reads into our buffer,
792 		 * book-keeping is easier if we don't bcopy() yet.
793 		 *
794 		 * Make sure there is enough room for bloated encrypt data.
795 		 */
796 		nmax = dmsg_ioq_makeroom(ioq, ioq->hbytes);
797 		ioq->state = DMSG_MSGQ_STATE_HEADER2;
798 		/* fall through */
799 	case DMSG_MSGQ_STATE_HEADER2:
800 		/*
801 		 * Fill out the extended header.
802 		 */
803 		assert(msg != NULL);
804 		if (bytes < ioq->hbytes) {
805 			n = read(iocom->sock_fd,
806 				 ioq->buf + ioq->fifo_end,
807 				 nmax);
808 			if (n <= 0) {
809 				if (n == 0) {
810 					ioq->error = DMSG_IOQ_ERROR_EOF;
811 					break;
812 				}
813 				if (errno != EINTR &&
814 				    errno != EINPROGRESS &&
815 				    errno != EAGAIN) {
816 					ioq->error = DMSG_IOQ_ERROR_SOCK;
817 					break;
818 				}
819 				n = 0;
820 				/* fall through */
821 			}
822 			ioq->fifo_end += (size_t)n;
823 			nmax -= (size_t)n;
824 		}
825 
826 		if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
827 			dmsg_crypto_decrypt(iocom, ioq);
828 		} else {
829 			ioq->fifo_cdx = ioq->fifo_end;
830 			ioq->fifo_cdn = ioq->fifo_end;
831 		}
832 		bytes = ioq->fifo_cdx - ioq->fifo_beg;
833 
834 		/*
835 		 * Insufficient data accumulated (set msg NULL so caller will
836 		 * retry on event).
837 		 */
838 		if (bytes < ioq->hbytes) {
839 			msg = NULL;
840 			break;
841 		}
842 
843 		/*
844 		 * Calculate the extended header, decrypt data received
845 		 * so far.  Handle endian-conversion for the entire extended
846 		 * header.
847 		 */
848 		head = (void *)(ioq->buf + ioq->fifo_beg);
849 
850 		/*
851 		 * Check the CRC.
852 		 */
853 		if (head->magic == DMSG_HDR_MAGIC_REV)
854 			xcrc32 = bswap32(head->hdr_crc);
855 		else
856 			xcrc32 = head->hdr_crc;
857 		head->hdr_crc = 0;
858 		if (dmsg_icrc32(head, ioq->hbytes) != xcrc32) {
859 			ioq->error = DMSG_IOQ_ERROR_XCRC;
860 			fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n",
861 				xcrc32, dmsg_icrc32(head, ioq->hbytes),
862 				dmsg_msg_str(msg));
863 			assert(0);
864 			break;
865 		}
866 		head->hdr_crc = xcrc32;
867 
868 		if (head->magic == DMSG_HDR_MAGIC_REV) {
869 			dmsg_bswap_head(head);
870 		}
871 
872 		/*
873 		 * Copy the extended header into the msg and adjust the
874 		 * FIFO.
875 		 */
876 		bcopy(head, &msg->any, ioq->hbytes);
877 
878 		/*
879 		 * We are either done or we fall-through.
880 		 */
881 		if (ioq->abytes == 0) {
882 			ioq->fifo_beg += ioq->hbytes;
883 			break;
884 		}
885 
886 		/*
887 		 * Must adjust bytes (and the state) when falling through.
888 		 * nmax doesn't change.
889 		 */
890 		ioq->fifo_beg += ioq->hbytes;
891 		bytes -= ioq->hbytes;
892 		ioq->state = DMSG_MSGQ_STATE_AUXDATA1;
893 		/* fall through */
894 	case DMSG_MSGQ_STATE_AUXDATA1:
895 		/*
896 		 * Copy the partial or complete [decrypted] payload from
897 		 * remaining bytes in the FIFO in order to optimize the
898 		 * makeroom call in the AUXDATA2 state.  We have to
899 		 * fall-through either way so we can check the crc.
900 		 *
901 		 * msg->aux_size tracks our aux data.
902 		 *
903 		 * (Lets not complicate matters if the data is encrypted,
904 		 *  since the data in-stream is not the same size as the
905 		 *  data decrypted).
906 		 */
907 		if (bytes >= ioq->abytes) {
908 			bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
909 			      ioq->abytes);
910 			msg->aux_size = ioq->abytes;
911 			ioq->fifo_beg += ioq->abytes;
912 			assert(ioq->fifo_beg <= ioq->fifo_cdx);
913 			assert(ioq->fifo_cdx <= ioq->fifo_cdn);
914 			bytes -= ioq->abytes;
915 		} else if (bytes) {
916 			bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
917 			      bytes);
918 			msg->aux_size = bytes;
919 			ioq->fifo_beg += bytes;
920 			if (ioq->fifo_cdx < ioq->fifo_beg)
921 				ioq->fifo_cdx = ioq->fifo_beg;
922 			assert(ioq->fifo_beg <= ioq->fifo_cdx);
923 			assert(ioq->fifo_cdx <= ioq->fifo_cdn);
924 			bytes = 0;
925 		} else {
926 			msg->aux_size = 0;
927 		}
928 		ioq->state = DMSG_MSGQ_STATE_AUXDATA2;
929 		/* fall through */
930 	case DMSG_MSGQ_STATE_AUXDATA2:
931 		/*
932 		 * Make sure there is enough room for more data.
933 		 */
934 		assert(msg);
935 		nmax = dmsg_ioq_makeroom(ioq, ioq->abytes - msg->aux_size);
936 
937 		/*
938 		 * Read and decrypt more of the payload.
939 		 */
940 		if (msg->aux_size < ioq->abytes) {
941 			assert(bytes == 0);
942 			n = read(iocom->sock_fd,
943 				 ioq->buf + ioq->fifo_end,
944 				 nmax);
945 			if (n <= 0) {
946 				if (n == 0) {
947 					ioq->error = DMSG_IOQ_ERROR_EOF;
948 					break;
949 				}
950 				if (errno != EINTR &&
951 				    errno != EINPROGRESS &&
952 				    errno != EAGAIN) {
953 					ioq->error = DMSG_IOQ_ERROR_SOCK;
954 					break;
955 				}
956 				n = 0;
957 				/* fall through */
958 			}
959 			ioq->fifo_end += (size_t)n;
960 			nmax -= (size_t)n;
961 		}
962 
963 		if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
964 			dmsg_crypto_decrypt(iocom, ioq);
965 		} else {
966 			ioq->fifo_cdx = ioq->fifo_end;
967 			ioq->fifo_cdn = ioq->fifo_end;
968 		}
969 		bytes = ioq->fifo_cdx - ioq->fifo_beg;
970 
971 		if (bytes > ioq->abytes - msg->aux_size)
972 			bytes = ioq->abytes - msg->aux_size;
973 
974 		if (bytes) {
975 			bcopy(ioq->buf + ioq->fifo_beg,
976 			      msg->aux_data + msg->aux_size,
977 			      bytes);
978 			msg->aux_size += bytes;
979 			ioq->fifo_beg += bytes;
980 		}
981 
982 		/*
983 		 * Insufficient data accumulated (set msg NULL so caller will
984 		 * retry on event).
985 		 *
986 		 * Assert the auxillary data size is correct, then record the
987 		 * original unaligned size from the message header.
988 		 */
989 		if (msg->aux_size < ioq->abytes) {
990 			msg = NULL;
991 			break;
992 		}
993 		assert(msg->aux_size == ioq->abytes);
994 		msg->aux_size = ioq->unaligned_aux_size;
995 
996 		/*
997 		 * Check aux_crc, then we are done.  Note that the crc
998 		 * is calculated over the aligned size, not the actual
999 		 * size.
1000 		 */
1001 		xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes);
1002 		if (xcrc32 != msg->any.head.aux_crc) {
1003 			ioq->error = DMSG_IOQ_ERROR_ACRC;
1004 			fprintf(stderr,
1005 				"iocom: ACRC error %08x vs %08x "
1006 				"msgid %016jx msgcmd %08x auxsize %d\n",
1007 				xcrc32,
1008 				msg->any.head.aux_crc,
1009 				(intmax_t)msg->any.head.msgid,
1010 				msg->any.head.cmd,
1011 				msg->any.head.aux_bytes);
1012 			break;
1013 		}
1014 		break;
1015 	case DMSG_MSGQ_STATE_ERROR:
1016 		/*
1017 		 * Continued calls to drain recorded transactions (returning
1018 		 * a LNK_ERROR for each one), before we return the final
1019 		 * LNK_ERROR.
1020 		 */
1021 		assert(msg == NULL);
1022 		break;
1023 	default:
1024 		/*
1025 		 * We don't double-return errors, the caller should not
1026 		 * have called us again after getting an error msg.
1027 		 */
1028 		assert(0);
1029 		break;
1030 	}
1031 
1032 	/*
1033 	 * Check the message sequence.  The iv[] should prevent any
1034 	 * possibility of a replay but we add this check anyway.
1035 	 */
1036 	if (msg && ioq->error == 0) {
1037 		if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
1038 			ioq->error = DMSG_IOQ_ERROR_MSGSEQ;
1039 		} else {
1040 			++ioq->seq;
1041 		}
1042 	}
1043 
1044 	/*
1045 	 * Handle error, RREQ, or completion
1046 	 *
1047 	 * NOTE: nmax and bytes are invalid at this point, we don't bother
1048 	 *	 to update them when breaking out.
1049 	 */
1050 	if (ioq->error) {
1051 		dmsg_state_t *tmp_state;
1052 skip:
1053 		fprintf(stderr, "IOQ ERROR %d\n", ioq->error);
1054 		/*
1055 		 * An unrecoverable error causes all active receive
1056 		 * transactions to be terminated with a LNK_ERROR message.
1057 		 *
1058 		 * Once all active transactions are exhausted we set the
1059 		 * iocom ERROR flag and return a non-transactional LNK_ERROR
1060 		 * message, which should cause master processing loops to
1061 		 * terminate.
1062 		 */
1063 		assert(ioq->msg == msg);
1064 		if (msg) {
1065 			dmsg_msg_free(msg);
1066 			ioq->msg = NULL;
1067 			msg = NULL;
1068 		}
1069 
1070 		/*
1071 		 * No more I/O read processing
1072 		 */
1073 		ioq->state = DMSG_MSGQ_STATE_ERROR;
1074 
1075 		/*
1076 		 * Simulate a remote LNK_ERROR DELETE msg for any open
1077 		 * transactions, ending with a final non-transactional
1078 		 * LNK_ERROR (that the session can detect) when no
1079 		 * transactions remain.
1080 		 *
1081 		 * NOTE: Temporarily supply state0 and a degenerate cmd
1082 		 *	 without CREATE set.  The real state will be
1083 		 *	 assigned in the loop.
1084 		 *
1085 		 * NOTE: We are simulating a received message using our
1086 		 *	 side of the state, so the DMSGF_REV* bits have
1087 		 *	 to be reversed.
1088 		 */
1089 		pthread_mutex_lock(&iocom->mtx);
1090 		dmsg_iocom_drain(iocom);
1091 
1092 		tmp_state = NULL;
1093 		RB_FOREACH(state, dmsg_state_tree, &iocom->staterd_tree) {
1094 			atomic_set_int(&state->flags, DMSG_STATE_DYING);
1095 			if (tmp_state == NULL && TAILQ_EMPTY(&state->subq))
1096 				tmp_state = state;
1097 		}
1098 		RB_FOREACH(state, dmsg_state_tree, &iocom->statewr_tree) {
1099 			atomic_set_int(&state->flags, DMSG_STATE_DYING);
1100 			if (tmp_state == NULL && TAILQ_EMPTY(&state->subq))
1101 				tmp_state = state;
1102 		}
1103 
1104 		if (tmp_state) {
1105 			dmsg_msg_simulate_failure(tmp_state, ioq->error);
1106 		} else {
1107 			dmsg_msg_simulate_failure(&iocom->state0, ioq->error);
1108 		}
1109 		pthread_mutex_unlock(&iocom->mtx);
1110 		if (TAILQ_FIRST(&ioq->msgq))
1111 			goto again;
1112 
1113 #if 0
1114 		/*
1115 		 * For the iocom error case we want to set RWORK to indicate
1116 		 * that more messages might be pending.
1117 		 *
1118 		 * It is possible to return NULL when there is more work to
1119 		 * do because each message has to be DELETEd in both
1120 		 * directions before we continue on with the next (though
1121 		 * this could be optimized).  The transmit direction will
1122 		 * re-set RWORK.
1123 		 */
1124 		if (msg)
1125 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1126 #endif
1127 	} else if (msg == NULL) {
1128 		/*
1129 		 * Insufficient data received to finish building the message,
1130 		 * set RREQ and return NULL.
1131 		 *
1132 		 * Leave ioq->msg intact.
1133 		 * Leave the FIFO intact.
1134 		 */
1135 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1136 	} else {
1137 		/*
1138 		 * Continue processing msg.
1139 		 *
1140 		 * The fifo has already been advanced past the message.
1141 		 * Trivially reset the FIFO indices if possible.
1142 		 *
1143 		 * clear the FIFO if it is now empty and set RREQ to wait
1144 		 * for more from the socket.  If the FIFO is not empty set
1145 		 * TWORK to bypass the poll so we loop immediately.
1146 		 */
1147 		if (ioq->fifo_beg == ioq->fifo_cdx &&
1148 		    ioq->fifo_cdn == ioq->fifo_end) {
1149 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1150 			ioq->fifo_cdx = 0;
1151 			ioq->fifo_cdn = 0;
1152 			ioq->fifo_beg = 0;
1153 			ioq->fifo_end = 0;
1154 		} else {
1155 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1156 		}
1157 		ioq->state = DMSG_MSGQ_STATE_HEADER1;
1158 		ioq->msg = NULL;
1159 
1160 		/*
1161 		 * Handle message routing.  Validates non-zero sources
1162 		 * and routes message.  Error will be 0 if the message is
1163 		 * destined for us.
1164 		 *
1165 		 * State processing only occurs for messages destined for us.
1166 		 */
1167 		if (DMsgDebugOpt >= 5) {
1168 			fprintf(stderr,
1169 				"rxmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1170 				msg->any.head.cmd,
1171 				(intmax_t)msg->any.head.msgid,
1172 				(intmax_t)msg->any.head.circuit);
1173 		}
1174 		error = dmsg_state_msgrx(msg);
1175 
1176 		if (error) {
1177 			/*
1178 			 * Abort-after-closure, throw message away and
1179 			 * start reading another.
1180 			 */
1181 			if (error == DMSG_IOQ_ERROR_EALREADY) {
1182 				dmsg_msg_free(msg);
1183 				goto again;
1184 			}
1185 
1186 			/*
1187 			 * Process real error and throw away message.
1188 			 */
1189 			ioq->error = error;
1190 			goto skip;
1191 		}
1192 		/* no error, not routed.  Fall through and return msg */
1193 	}
1194 	return (msg);
1195 }
1196 
1197 /*
1198  * Calculate the header and data crc's and write a low-level message to
1199  * the connection.  If aux_crc is non-zero the aux_data crc is already
1200  * assumed to have been set.
1201  *
1202  * A non-NULL msg is added to the queue but not necessarily flushed.
1203  * Calling this function with msg == NULL will get a flush going.
1204  *
1205  * (called from iocom_core only)
1206  */
1207 void
1208 dmsg_iocom_flush1(dmsg_iocom_t *iocom)
1209 {
1210 	dmsg_ioq_t *ioq = &iocom->ioq_tx;
1211 	dmsg_msg_t *msg;
1212 	uint32_t xcrc32;
1213 	size_t hbytes;
1214 	size_t abytes;
1215 	dmsg_msg_queue_t tmpq;
1216 
1217 	atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1218 	TAILQ_INIT(&tmpq);
1219 	pthread_mutex_lock(&iocom->mtx);
1220 	while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
1221 		TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
1222 		TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
1223 	}
1224 	pthread_mutex_unlock(&iocom->mtx);
1225 
1226 	while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
1227 		/*
1228 		 * Process terminal connection errors.
1229 		 */
1230 		TAILQ_REMOVE(&tmpq, msg, qentry);
1231 		if (ioq->error) {
1232 			TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1233 			++ioq->msgcount;
1234 			continue;
1235 		}
1236 
1237 		/*
1238 		 * Finish populating the msg fields.  The salt ensures that
1239 		 * the iv[] array is ridiculously randomized and we also
1240 		 * re-seed our PRNG every 32768 messages just to be sure.
1241 		 */
1242 		msg->any.head.magic = DMSG_HDR_MAGIC;
1243 		msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
1244 		++ioq->seq;
1245 		if ((ioq->seq & 32767) == 0)
1246 			srandomdev();
1247 
1248 		/*
1249 		 * Calculate aux_crc if 0, then calculate hdr_crc.
1250 		 */
1251 		if (msg->aux_size && msg->any.head.aux_crc == 0) {
1252 			abytes = DMSG_DOALIGN(msg->aux_size);
1253 			xcrc32 = dmsg_icrc32(msg->aux_data, abytes);
1254 			msg->any.head.aux_crc = xcrc32;
1255 		}
1256 		msg->any.head.aux_bytes = msg->aux_size;
1257 
1258 		hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1259 			 DMSG_ALIGN;
1260 		msg->any.head.hdr_crc = 0;
1261 		msg->any.head.hdr_crc = dmsg_icrc32(&msg->any.head, hbytes);
1262 
1263 		/*
1264 		 * Enqueue the message (the flush codes handles stream
1265 		 * encryption).
1266 		 */
1267 		TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1268 		++ioq->msgcount;
1269 	}
1270 	dmsg_iocom_flush2(iocom);
1271 }
1272 
1273 /*
1274  * Thread localized, iocom->mtx not held by caller.
1275  *
1276  * (called from iocom_core via iocom_flush1 only)
1277  */
1278 void
1279 dmsg_iocom_flush2(dmsg_iocom_t *iocom)
1280 {
1281 	dmsg_ioq_t *ioq = &iocom->ioq_tx;
1282 	dmsg_msg_t *msg;
1283 	ssize_t n;
1284 	struct iovec iov[DMSG_IOQ_MAXIOVEC];
1285 	size_t nact;
1286 	size_t hbytes;
1287 	size_t abytes;
1288 	size_t hoff;
1289 	size_t aoff;
1290 	int iovcnt;
1291 
1292 	if (ioq->error) {
1293 		dmsg_iocom_drain(iocom);
1294 		return;
1295 	}
1296 
1297 	/*
1298 	 * Pump messages out the connection by building an iovec.
1299 	 *
1300 	 * ioq->hbytes/ioq->abytes tracks how much of the first message
1301 	 * in the queue has been successfully written out, so we can
1302 	 * resume writing.
1303 	 */
1304 	iovcnt = 0;
1305 	nact = 0;
1306 	hoff = ioq->hbytes;
1307 	aoff = ioq->abytes;
1308 
1309 	TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
1310 		hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1311 			 DMSG_ALIGN;
1312 		abytes = DMSG_DOALIGN(msg->aux_size);
1313 		assert(hoff <= hbytes && aoff <= abytes);
1314 
1315 		if (hoff < hbytes) {
1316 			iov[iovcnt].iov_base = (char *)&msg->any.head + hoff;
1317 			iov[iovcnt].iov_len = hbytes - hoff;
1318 			nact += hbytes - hoff;
1319 			++iovcnt;
1320 			if (iovcnt == DMSG_IOQ_MAXIOVEC)
1321 				break;
1322 		}
1323 		if (aoff < abytes) {
1324 			assert(msg->aux_data != NULL);
1325 			iov[iovcnt].iov_base = (char *)msg->aux_data + aoff;
1326 			iov[iovcnt].iov_len = abytes - aoff;
1327 			nact += abytes - aoff;
1328 			++iovcnt;
1329 			if (iovcnt == DMSG_IOQ_MAXIOVEC)
1330 				break;
1331 		}
1332 		hoff = 0;
1333 		aoff = 0;
1334 	}
1335 	if (iovcnt == 0)
1336 		return;
1337 
1338 	/*
1339 	 * Encrypt and write the data.  The crypto code will move the
1340 	 * data into the fifo and adjust the iov as necessary.  If
1341 	 * encryption is disabled the iov is left alone.
1342 	 *
1343 	 * May return a smaller iov (thus a smaller n), with aggregated
1344 	 * chunks.  May reduce nmax to what fits in the FIFO.
1345 	 *
1346 	 * This function sets nact to the number of original bytes now
1347 	 * encrypted, adding to the FIFO some number of bytes that might
1348 	 * be greater depending on the crypto mechanic.  iov[] is adjusted
1349 	 * to point at the FIFO if necessary.
1350 	 *
1351 	 * NOTE: The return value from the writev() is the post-encrypted
1352 	 *	 byte count, not the plaintext count.
1353 	 */
1354 	if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
1355 		/*
1356 		 * Make sure the FIFO has a reasonable amount of space
1357 		 * left (if not completely full).
1358 		 *
1359 		 * In this situation we are staging the encrypted message
1360 		 * data in the FIFO.  (nact) represents how much plaintext
1361 		 * has been staged, (n) represents how much encrypted data
1362 		 * has been flushed.  The two are independent of each other.
1363 		 */
1364 		if (ioq->fifo_beg > sizeof(ioq->buf) / 2 &&
1365 		    sizeof(ioq->buf) - ioq->fifo_end < DMSG_ALIGN * 2) {
1366 			bcopy(ioq->buf + ioq->fifo_beg, ioq->buf,
1367 			      ioq->fifo_end - ioq->fifo_beg);
1368 			ioq->fifo_cdx -= ioq->fifo_beg;
1369 			ioq->fifo_cdn -= ioq->fifo_beg;
1370 			ioq->fifo_end -= ioq->fifo_beg;
1371 			ioq->fifo_beg = 0;
1372 		}
1373 
1374 		iovcnt = dmsg_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact);
1375 		n = writev(iocom->sock_fd, iov, iovcnt);
1376 		if (n > 0) {
1377 			ioq->fifo_beg += n;
1378 			ioq->fifo_cdn += n;
1379 			ioq->fifo_cdx += n;
1380 			if (ioq->fifo_beg == ioq->fifo_end) {
1381 				ioq->fifo_beg = 0;
1382 				ioq->fifo_cdn = 0;
1383 				ioq->fifo_cdx = 0;
1384 				ioq->fifo_end = 0;
1385 			}
1386 		}
1387 		/*
1388 		 * We don't mess with the nact returned by the crypto_encrypt
1389 		 * call, which represents the filling of the FIFO.  (n) tells
1390 		 * us how much we were able to write from the FIFO.  The two
1391 		 * are different beasts when encrypting.
1392 		 */
1393 	} else {
1394 		/*
1395 		 * In this situation we are not staging the messages to the
1396 		 * FIFO but instead writing them directly from the msg
1397 		 * structure(s), so (nact) is basically (n).
1398 		 */
1399 		n = writev(iocom->sock_fd, iov, iovcnt);
1400 		if (n > 0)
1401 			nact = n;
1402 		else
1403 			nact = 0;
1404 	}
1405 
1406 	/*
1407 	 * Clean out the transmit queue based on what we successfully
1408 	 * sent (nact is the plaintext count).  ioq->hbytes/abytes
1409 	 * represents the portion of the first message previously sent.
1410 	 */
1411 	while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1412 		hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1413 			 DMSG_ALIGN;
1414 		abytes = DMSG_DOALIGN(msg->aux_size);
1415 
1416 		if ((size_t)nact < hbytes - ioq->hbytes) {
1417 			ioq->hbytes += nact;
1418 			nact = 0;
1419 			break;
1420 		}
1421 		nact -= hbytes - ioq->hbytes;
1422 		ioq->hbytes = hbytes;
1423 		if ((size_t)nact < abytes - ioq->abytes) {
1424 			ioq->abytes += nact;
1425 			nact = 0;
1426 			break;
1427 		}
1428 		nact -= abytes - ioq->abytes;
1429 		/* ioq->abytes = abytes; optimized out */
1430 
1431 #if 0
1432 		fprintf(stderr,
1433 			"txmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1434 			msg->any.head.cmd,
1435 			(intmax_t)msg->any.head.msgid,
1436 			(intmax_t)msg->any.head.circuit);
1437 #endif
1438 
1439 		TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1440 		--ioq->msgcount;
1441 		ioq->hbytes = 0;
1442 		ioq->abytes = 0;
1443 		dmsg_msg_free(msg);
1444 	}
1445 	assert(nact == 0);
1446 
1447 	/*
1448 	 * Process the return value from the write w/regards to blocking.
1449 	 */
1450 	if (n < 0) {
1451 		if (errno != EINTR &&
1452 		    errno != EINPROGRESS &&
1453 		    errno != EAGAIN) {
1454 			/*
1455 			 * Fatal write error
1456 			 */
1457 			ioq->error = DMSG_IOQ_ERROR_SOCK;
1458 			dmsg_iocom_drain(iocom);
1459 		} else {
1460 			/*
1461 			 * Wait for socket buffer space
1462 			 */
1463 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
1464 		}
1465 	} else {
1466 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
1467 	}
1468 	if (ioq->error) {
1469 		dmsg_iocom_drain(iocom);
1470 	}
1471 }
1472 
1473 /*
1474  * Kill pending msgs on ioq_tx and adjust the flags such that no more
1475  * write events will occur.  We don't kill read msgs because we want
1476  * the caller to pull off our contrived terminal error msg to detect
1477  * the connection failure.
1478  *
1479  * Localized to iocom_core thread, iocom->mtx not held by caller.
1480  */
1481 void
1482 dmsg_iocom_drain(dmsg_iocom_t *iocom)
1483 {
1484 	dmsg_ioq_t *ioq = &iocom->ioq_tx;
1485 	dmsg_msg_t *msg;
1486 
1487 	atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1488 	ioq->hbytes = 0;
1489 	ioq->abytes = 0;
1490 
1491 	while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1492 		TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1493 		--ioq->msgcount;
1494 		dmsg_msg_free(msg);
1495 	}
1496 }
1497 
1498 /*
1499  * Write a message to an iocom, with additional state processing.
1500  */
1501 void
1502 dmsg_msg_write(dmsg_msg_t *msg)
1503 {
1504 	dmsg_iocom_t *iocom = msg->state->iocom;
1505 	dmsg_state_t *state;
1506 	char dummy;
1507 
1508 	pthread_mutex_lock(&iocom->mtx);
1509 	state = msg->state;
1510 
1511 	/*
1512 	 * Make sure the parent transaction is still open in the transmit
1513 	 * direction.  If it isn't the message is dead and we have to
1514 	 * potentially simulate a rxmsg terminating the transaction.
1515 	 */
1516 	if (state->parent->txcmd & DMSGF_DELETE) {
1517 		fprintf(stderr, "dmsg_msg_write: EARLY TERMINATION\n");
1518 		dmsg_msg_simulate_failure(state, DMSG_ERR_LOSTLINK);
1519 		dmsg_state_cleanuptx(iocom, msg);
1520 		dmsg_msg_free(msg);
1521 		pthread_mutex_unlock(&iocom->mtx);
1522 		return;
1523 	}
1524 
1525 	/*
1526 	 * Process state data into the message as needed, then update the
1527 	 * state based on the message.
1528 	 */
1529 	if ((state->flags & DMSG_STATE_ROOT) == 0) {
1530 		/*
1531 		 * Existing transaction (could be reply).  It is also
1532 		 * possible for this to be the first reply (CREATE is set),
1533 		 * in which case we populate state->txcmd.
1534 		 *
1535 		 * state->txcmd is adjusted to hold the final message cmd,
1536 		 * and we also be sure to set the CREATE bit here.  We did
1537 		 * not set it in dmsg_msg_alloc() because that would have
1538 		 * not been serialized (state could have gotten ripped out
1539 		 * from under the message prior to it being transmitted).
1540 		 */
1541 		if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
1542 		    DMSGF_CREATE) {
1543 			state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1544 			state->icmd = state->txcmd & DMSGF_BASECMDMASK;
1545 		}
1546 		msg->any.head.msgid = state->msgid;
1547 
1548 		if (msg->any.head.cmd & DMSGF_CREATE) {
1549 			state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1550 		}
1551 	}
1552 	dmsg_state_cleanuptx(iocom, msg);
1553 
1554 #if 0
1555 	fprintf(stderr,
1556 		"MSGWRITE %016jx %08x\n",
1557 		msg->any.head.msgid, msg->any.head.cmd);
1558 #endif
1559 
1560 	/*
1561 	 * Queue it for output, wake up the I/O pthread.  Note that the
1562 	 * I/O thread is responsible for generating the CRCs and encryption.
1563 	 */
1564 	TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
1565 	dummy = 0;
1566 	write(iocom->wakeupfds[1], &dummy, 1);	/* XXX optimize me */
1567 	pthread_mutex_unlock(&iocom->mtx);
1568 }
1569 
1570 /*
1571  * iocom->mtx must be held by caller.
1572  */
1573 static
1574 void
1575 dmsg_msg_simulate_failure(dmsg_state_t *state, int error)
1576 {
1577 	dmsg_iocom_t *iocom = state->iocom;
1578 	dmsg_msg_t *msg;
1579 
1580 	msg = NULL;
1581 
1582 	if (state == &iocom->state0) {
1583 		/*
1584 		 * No active local or remote transactions remain.
1585 		 * Generate a final LNK_ERROR and flag EOF.
1586 		 */
1587 		msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
1588 					    DMSG_LNK_ERROR,
1589 					    NULL, NULL);
1590 		msg->any.head.error = error;
1591 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
1592 		fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
1593 	} else if (state->flags & DMSG_STATE_OPPOSITE) {
1594 		/*
1595 		 * Active remote transactions are still present.
1596 		 * Simulate the other end sending us a DELETE.
1597 		 */
1598 		if (state->rxcmd & DMSGF_DELETE) {
1599 			fprintf(stderr,
1600 				"iocom: ioq error(rd) %d sleeping "
1601 				"state %p rxcmd %08x txcmd %08x "
1602 				"func %p\n",
1603 				error, state, state->rxcmd,
1604 				state->txcmd, state->func);
1605 			usleep(100000);	/* XXX */
1606 			atomic_set_int(&iocom->flags,
1607 				       DMSG_IOCOMF_RWORK);
1608 		} else {
1609 			fprintf(stderr, "SIMULATE ERROR1\n");
1610 			msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
1611 					     DMSG_LNK_ERROR,
1612 					     NULL, NULL);
1613 			/*state->txcmd |= DMSGF_DELETE;*/
1614 			msg->state = state;
1615 			msg->any.head.error = error;
1616 			msg->any.head.msgid = state->msgid;
1617 			msg->any.head.circuit = state->parent->msgid;
1618 			msg->any.head.cmd |= DMSGF_ABORT |
1619 					     DMSGF_DELETE;
1620 			if ((state->parent->flags &
1621 			     DMSG_STATE_OPPOSITE) == 0) {
1622 				msg->any.head.cmd |= DMSGF_REVCIRC;
1623 			}
1624 		}
1625 	} else {
1626 		/*
1627 		 * Active local transactions are still present.
1628 		 * Simulate the other end sending us a DELETE.
1629 		 */
1630 		if (state->rxcmd & DMSGF_DELETE) {
1631 			fprintf(stderr,
1632 				"iocom: ioq error(wr) %d sleeping "
1633 				"state %p rxcmd %08x txcmd %08x "
1634 				"func %p\n",
1635 				error, state, state->rxcmd,
1636 				state->txcmd, state->func);
1637 			usleep(100000);	/* XXX */
1638 			atomic_set_int(&iocom->flags,
1639 				       DMSG_IOCOMF_RWORK);
1640 		} else {
1641 			fprintf(stderr, "SIMULATE ERROR1\n");
1642 			msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
1643 					     DMSG_LNK_ERROR,
1644 					     NULL, NULL);
1645 			msg->state = state;
1646 			msg->any.head.error = error;
1647 			msg->any.head.msgid = state->msgid;
1648 			msg->any.head.circuit = state->parent->msgid;
1649 			msg->any.head.cmd |= DMSGF_ABORT |
1650 					     DMSGF_DELETE |
1651 					     DMSGF_REVTRANS |
1652 					     DMSGF_REPLY;
1653 			if ((state->parent->flags &
1654 			     DMSG_STATE_OPPOSITE) == 0) {
1655 				msg->any.head.cmd |= DMSGF_REVCIRC;
1656 			}
1657 			if ((state->rxcmd & DMSGF_CREATE) == 0)
1658 				msg->any.head.cmd |= DMSGF_CREATE;
1659 		}
1660 	}
1661 	if (msg) {
1662 		TAILQ_INSERT_TAIL(&iocom->ioq_rx.msgq, msg, qentry);
1663 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1664 	}
1665 }
1666 
1667 /*
1668  * This is a shortcut to formulate a reply to msg with a simple error code,
1669  * It can reply to and terminate a transaction, or it can reply to a one-way
1670  * messages.  A DMSG_LNK_ERROR command code is utilized to encode
1671  * the error code (which can be 0).  Not all transactions are terminated
1672  * with DMSG_LNK_ERROR status (the low level only cares about the
1673  * MSGF_DELETE flag), but most are.
1674  *
1675  * Replies to one-way messages are a bit of an oxymoron but the feature
1676  * is used by the debug (DBG) protocol.
1677  *
1678  * The reply contains no extended data.
1679  */
1680 void
1681 dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
1682 {
1683 	dmsg_state_t *state = msg->state;
1684 	dmsg_msg_t *nmsg;
1685 	uint32_t cmd;
1686 
1687 	/*
1688 	 * Reply with a simple error code and terminate the transaction.
1689 	 */
1690 	cmd = DMSG_LNK_ERROR;
1691 
1692 	/*
1693 	 * Check if our direction has even been initiated yet, set CREATE.
1694 	 *
1695 	 * Check what direction this is (command or reply direction).  Note
1696 	 * that txcmd might not have been initiated yet.
1697 	 *
1698 	 * If our direction has already been closed we just return without
1699 	 * doing anything.
1700 	 */
1701 	if ((state->flags & DMSG_STATE_ROOT) == 0) {
1702 		if (state->txcmd & DMSGF_DELETE)
1703 			return;
1704 		if (state->txcmd & DMSGF_REPLY)
1705 			cmd |= DMSGF_REPLY;
1706 		cmd |= DMSGF_DELETE;
1707 	} else {
1708 		if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1709 			cmd |= DMSGF_REPLY;
1710 	}
1711 
1712 	/*
1713 	 * Allocate the message and associate it with the existing state.
1714 	 * We cannot pass DMSGF_CREATE to msg_alloc() because that may
1715 	 * allocate new state.  We have our state already.
1716 	 */
1717 	nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1718 	if ((state->flags & DMSG_STATE_ROOT) == 0) {
1719 		if ((state->txcmd & DMSGF_CREATE) == 0)
1720 			nmsg->any.head.cmd |= DMSGF_CREATE;
1721 	}
1722 	nmsg->any.head.error = error;
1723 
1724 	dmsg_msg_write(nmsg);
1725 }
1726 
1727 /*
1728  * Similar to dmsg_msg_reply() but leave the transaction open.  That is,
1729  * we are generating a streaming reply or an intermediate acknowledgement
1730  * of some sort as part of the higher level protocol, with more to come
1731  * later.
1732  */
1733 void
1734 dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
1735 {
1736 	dmsg_state_t *state = msg->state;
1737 	dmsg_msg_t *nmsg;
1738 	uint32_t cmd;
1739 
1740 
1741 	/*
1742 	 * Reply with a simple error code and terminate the transaction.
1743 	 */
1744 	cmd = DMSG_LNK_ERROR;
1745 
1746 	/*
1747 	 * Check if our direction has even been initiated yet, set CREATE.
1748 	 *
1749 	 * Check what direction this is (command or reply direction).  Note
1750 	 * that txcmd might not have been initiated yet.
1751 	 *
1752 	 * If our direction has already been closed we just return without
1753 	 * doing anything.
1754 	 */
1755 	if ((state->flags & DMSG_STATE_ROOT) == 0) {
1756 		if (state->txcmd & DMSGF_DELETE)
1757 			return;
1758 		if (state->txcmd & DMSGF_REPLY)
1759 			cmd |= DMSGF_REPLY;
1760 		/* continuing transaction, do not set MSGF_DELETE */
1761 	} else {
1762 		if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1763 			cmd |= DMSGF_REPLY;
1764 	}
1765 	nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1766 	if ((state->flags & DMSG_STATE_ROOT) == 0) {
1767 		if ((state->txcmd & DMSGF_CREATE) == 0)
1768 			nmsg->any.head.cmd |= DMSGF_CREATE;
1769 	}
1770 	nmsg->any.head.error = error;
1771 
1772 	dmsg_msg_write(nmsg);
1773 }
1774 
1775 /*
1776  * Terminate a transaction given a state structure by issuing a DELETE.
1777  * (the state structure must not be &iocom->state0)
1778  */
1779 void
1780 dmsg_state_reply(dmsg_state_t *state, uint32_t error)
1781 {
1782 	dmsg_msg_t *nmsg;
1783 	uint32_t cmd = DMSG_LNK_ERROR | DMSGF_DELETE;
1784 
1785 	/*
1786 	 * Nothing to do if we already transmitted a delete
1787 	 */
1788 	if (state->txcmd & DMSGF_DELETE)
1789 		return;
1790 
1791 	/*
1792 	 * Set REPLY if the other end initiated the command.  Otherwise
1793 	 * we are the command direction.
1794 	 */
1795 	if (state->txcmd & DMSGF_REPLY)
1796 		cmd |= DMSGF_REPLY;
1797 
1798 	nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1799 	if ((state->flags & DMSG_STATE_ROOT) == 0) {
1800 		if ((state->txcmd & DMSGF_CREATE) == 0)
1801 			nmsg->any.head.cmd |= DMSGF_CREATE;
1802 	}
1803 	nmsg->any.head.error = error;
1804 	dmsg_msg_write(nmsg);
1805 }
1806 
1807 /*
1808  * Terminate a transaction given a state structure by issuing a DELETE.
1809  * (the state structure must not be &iocom->state0)
1810  */
1811 void
1812 dmsg_state_result(dmsg_state_t *state, uint32_t error)
1813 {
1814 	dmsg_msg_t *nmsg;
1815 	uint32_t cmd = DMSG_LNK_ERROR;
1816 
1817 	/*
1818 	 * Nothing to do if we already transmitted a delete
1819 	 */
1820 	if (state->txcmd & DMSGF_DELETE)
1821 		return;
1822 
1823 	/*
1824 	 * Set REPLY if the other end initiated the command.  Otherwise
1825 	 * we are the command direction.
1826 	 */
1827 	if (state->txcmd & DMSGF_REPLY)
1828 		cmd |= DMSGF_REPLY;
1829 
1830 	nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1831 	if ((state->flags & DMSG_STATE_ROOT) == 0) {
1832 		if ((state->txcmd & DMSGF_CREATE) == 0)
1833 			nmsg->any.head.cmd |= DMSGF_CREATE;
1834 	}
1835 	nmsg->any.head.error = error;
1836 	dmsg_msg_write(nmsg);
1837 }
1838 
1839 /************************************************************************
1840  *			TRANSACTION STATE HANDLING			*
1841  ************************************************************************
1842  *
1843  */
1844 
1845 /*
1846  * Process state tracking for a message after reception, prior to execution.
1847  * Possibly route the message (consuming it).
1848  *
1849  * Called with msglk held and the msg dequeued.
1850  *
1851  * All messages are called with dummy state and return actual state.
1852  * (One-off messages often just return the same dummy state).
1853  *
1854  * May request that caller discard the message by setting *discardp to 1.
1855  * The returned state is not used in this case and is allowed to be NULL.
1856  *
1857  * --
1858  *
1859  * These routines handle persistent and command/reply message state via the
1860  * CREATE and DELETE flags.  The first message in a command or reply sequence
1861  * sets CREATE, the last message in a command or reply sequence sets DELETE.
1862  *
1863  * There can be any number of intermediate messages belonging to the same
1864  * sequence sent inbetween the CREATE message and the DELETE message,
1865  * which set neither flag.  This represents a streaming command or reply.
1866  *
1867  * Any command message received with CREATE set expects a reply sequence to
1868  * be returned.  Reply sequences work the same as command sequences except the
1869  * REPLY bit is also sent.  Both the command side and reply side can
1870  * degenerate into a single message with both CREATE and DELETE set.  Note
1871  * that one side can be streaming and the other side not, or neither, or both.
1872  *
1873  * The msgid is unique for the initiator.  That is, two sides sending a new
1874  * message can use the same msgid without colliding.
1875  *
1876  * --
1877  *
1878  * ABORT sequences work by setting the ABORT flag along with normal message
1879  * state.  However, ABORTs can also be sent on half-closed messages, that is
1880  * even if the command or reply side has already sent a DELETE, as long as
1881  * the message has not been fully closed it can still send an ABORT+DELETE
1882  * to terminate the half-closed message state.
1883  *
1884  * Since ABORT+DELETEs can race we silently discard ABORT's for message
1885  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
1886  * also race, and in this situation the other side might have already
1887  * initiated a new unrelated command with the same message id.  Since
1888  * the abort has not set the CREATE flag the situation can be detected
1889  * and the message will also be discarded.
1890  *
1891  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1892  * The ABORT request is essentially integrated into the command instead
1893  * of being sent later on.  In this situation the command implementation
1894  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1895  * special-case non-blocking operation for the command.
1896  *
1897  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
1898  *	  to be mid-stream aborts for command/reply sequences.  ABORTs on
1899  *	  one-way messages are not supported.
1900  *
1901  * NOTE!  If a command sequence does not support aborts the ABORT flag is
1902  *	  simply ignored.
1903  *
1904  * --
1905  *
1906  * One-off messages (no reply expected) are sent without an established
1907  * transaction.  CREATE and DELETE are left clear and the msgid is usually 0.
1908  * For one-off messages sent over circuits msgid generally MUST be 0.
1909  *
1910  * One-off messages cannot be aborted and typically aren't processed
1911  * by these routines.  Order is still guaranteed for messages sent over
1912  * the same circuit.  The REPLY bit can be used to distinguish whether
1913  * a one-off message is a command or reply.  For example, one-off replies
1914  * will typically just contain status updates.
1915  */
1916 static int
1917 dmsg_state_msgrx(dmsg_msg_t *msg)
1918 {
1919 	dmsg_iocom_t *iocom = msg->state->iocom;
1920 	dmsg_state_t *state;
1921 	dmsg_state_t *pstate;
1922 	dmsg_state_t sdummy;
1923 	int error;
1924 
1925 	pthread_mutex_lock(&iocom->mtx);
1926 
1927 	/*
1928 	 * Lookup the circuit (pstate).  The circuit will be an open
1929 	 * transaction.  The REVCIRC bit in the message tells us which side
1930 	 * initiated it.
1931 	 */
1932 	if (msg->any.head.circuit) {
1933 		sdummy.msgid = msg->any.head.circuit;
1934 
1935 		if (msg->any.head.cmd & DMSGF_REVCIRC) {
1936 			pstate = RB_FIND(dmsg_state_tree,
1937 					 &iocom->statewr_tree,
1938 					 &sdummy);
1939 		} else {
1940 			pstate = RB_FIND(dmsg_state_tree,
1941 					 &iocom->staterd_tree,
1942 					 &sdummy);
1943 		}
1944 		if (pstate == NULL) {
1945 			fprintf(stderr,
1946 				"missing parent in stacked trans %s\n",
1947 				dmsg_msg_str(msg));
1948 			error = DMSG_IOQ_ERROR_TRANS;
1949 			pthread_mutex_unlock(&iocom->mtx);
1950 			assert(0);
1951 		}
1952 	} else {
1953 		pstate = &iocom->state0;
1954 	}
1955 
1956 	/*
1957 	 * Lookup the msgid.
1958 	 *
1959 	 * If received msg is a command state is on staterd_tree.
1960 	 * If received msg is a reply state is on statewr_tree.
1961 	 * Otherwise there is no state (retain &iocom->state0)
1962 	 */
1963 	sdummy.msgid = msg->any.head.msgid;
1964 	if (msg->any.head.cmd & DMSGF_REVTRANS)
1965 		state = RB_FIND(dmsg_state_tree, &iocom->statewr_tree, &sdummy);
1966 	else
1967 		state = RB_FIND(dmsg_state_tree, &iocom->staterd_tree, &sdummy);
1968 
1969 	if (state) {
1970 		/*
1971 		 * Message over an existing transaction (CREATE should not
1972 		 * be set).
1973 		 */
1974 		msg->state = state;
1975 		assert(pstate == state->parent);
1976 	} else {
1977 		/*
1978 		 * Either a new transaction (if CREATE set) or a one-off.
1979 		 */
1980 		state = pstate;
1981 	}
1982 
1983 	pthread_mutex_unlock(&iocom->mtx);
1984 
1985 	/*
1986 	 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1987 	 * inside the case statements.
1988 	 *
1989 	 * Construct new state as necessary.
1990 	 */
1991 	switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
1992 				    DMSGF_REPLY)) {
1993 	case DMSGF_CREATE:
1994 	case DMSGF_CREATE | DMSGF_DELETE:
1995 		/*
1996 		 * Create new sub-transaction under pstate.
1997 		 * (any DELETE is handled in post-processing of msg).
1998 		 *
1999 		 * (During routing the msgid was made unique for this
2000 		 * direction over the comlink, so our RB trees can be
2001 		 * iocom-based instead of state-based).
2002 		 */
2003 		if (state != pstate) {
2004 			fprintf(stderr,
2005 				"duplicate transaction %s\n",
2006 				dmsg_msg_str(msg));
2007 			error = DMSG_IOQ_ERROR_TRANS;
2008 			assert(0);
2009 			break;
2010 		}
2011 
2012 		/*
2013 		 * Allocate the new state.
2014 		 */
2015 		state = malloc(sizeof(*state));
2016 		atomic_add_int(&dmsg_state_count, 1);
2017 		bzero(state, sizeof(*state));
2018 		TAILQ_INIT(&state->subq);
2019 		dmsg_state_hold(pstate);
2020 		state->refs = 1;
2021 		state->parent = pstate;
2022 		state->iocom = iocom;
2023 		state->flags = DMSG_STATE_DYNAMIC |
2024 			       DMSG_STATE_OPPOSITE;
2025 		state->msgid = msg->any.head.msgid;
2026 		state->txcmd = DMSGF_REPLY;
2027 		state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
2028 		state->icmd = state->rxcmd & DMSGF_BASECMDMASK;
2029 		msg->state = state;
2030 		pthread_mutex_lock(&iocom->mtx);
2031 		RB_INSERT(dmsg_state_tree, &iocom->staterd_tree, state);
2032 		TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
2033 		state->flags |= DMSG_STATE_INSERTED;
2034 
2035 		/*
2036 		 * If the parent is a relay set up the state handler to
2037 		 * automatically route the message.  Local processing will
2038 		 * not occur if set.
2039 		 *
2040 		 * (state relays are seeded by SPAN processing)
2041 		 */
2042 		if (pstate->relay)
2043 			state->func = dmsg_state_relay;
2044 		pthread_mutex_unlock(&iocom->mtx);
2045 		error = 0;
2046 
2047 		if (DMsgDebugOpt) {
2048 			fprintf(stderr,
2049 				"create state %p id=%08x on iocom staterd %p\n",
2050 				state, (uint32_t)state->msgid, iocom);
2051 		}
2052 		break;
2053 	case DMSGF_DELETE:
2054 		/*
2055 		 * Persistent state is expected but might not exist if an
2056 		 * ABORT+DELETE races the close.
2057 		 *
2058 		 * (any DELETE is handled in post-processing of msg).
2059 		 */
2060 		if (state == pstate) {
2061 			if (msg->any.head.cmd & DMSGF_ABORT) {
2062 				error = DMSG_IOQ_ERROR_EALREADY;
2063 			} else {
2064 				fprintf(stderr, "missing-state %s\n",
2065 					dmsg_msg_str(msg));
2066 				error = DMSG_IOQ_ERROR_TRANS;
2067 				assert(0);
2068 			}
2069 			break;
2070 		}
2071 
2072 		/*
2073 		 * Handle another ABORT+DELETE case if the msgid has already
2074 		 * been reused.
2075 		 */
2076 		if ((state->rxcmd & DMSGF_CREATE) == 0) {
2077 			if (msg->any.head.cmd & DMSGF_ABORT) {
2078 				error = DMSG_IOQ_ERROR_EALREADY;
2079 			} else {
2080 				fprintf(stderr, "reused-state %s\n",
2081 					dmsg_msg_str(msg));
2082 				error = DMSG_IOQ_ERROR_TRANS;
2083 				assert(0);
2084 			}
2085 			break;
2086 		}
2087 		error = 0;
2088 		break;
2089 	default:
2090 		/*
2091 		 * Check for mid-stream ABORT command received, otherwise
2092 		 * allow.
2093 		 */
2094 		if (msg->any.head.cmd & DMSGF_ABORT) {
2095 			if ((state == pstate) ||
2096 			    (state->rxcmd & DMSGF_CREATE) == 0) {
2097 				error = DMSG_IOQ_ERROR_EALREADY;
2098 				break;
2099 			}
2100 		}
2101 		error = 0;
2102 		break;
2103 	case DMSGF_REPLY | DMSGF_CREATE:
2104 	case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
2105 		/*
2106 		 * When receiving a reply with CREATE set the original
2107 		 * persistent state message should already exist.
2108 		 */
2109 		if (state == pstate) {
2110 			fprintf(stderr, "no-state(r) %s\n",
2111 				dmsg_msg_str(msg));
2112 			error = DMSG_IOQ_ERROR_TRANS;
2113 			assert(0);
2114 			break;
2115 		}
2116 		assert(((state->rxcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0);
2117 		state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
2118 		error = 0;
2119 		break;
2120 	case DMSGF_REPLY | DMSGF_DELETE:
2121 		/*
2122 		 * Received REPLY+ABORT+DELETE in case where msgid has
2123 		 * already been fully closed, ignore the message.
2124 		 */
2125 		if (state == pstate) {
2126 			if (msg->any.head.cmd & DMSGF_ABORT) {
2127 				error = DMSG_IOQ_ERROR_EALREADY;
2128 			} else {
2129 				fprintf(stderr, "no-state(r,d) %s\n",
2130 					dmsg_msg_str(msg));
2131 				error = DMSG_IOQ_ERROR_TRANS;
2132 				assert(0);
2133 			}
2134 			break;
2135 		}
2136 
2137 		/*
2138 		 * Received REPLY+ABORT+DELETE in case where msgid has
2139 		 * already been reused for an unrelated message,
2140 		 * ignore the message.
2141 		 */
2142 		if ((state->rxcmd & DMSGF_CREATE) == 0) {
2143 			if (msg->any.head.cmd & DMSGF_ABORT) {
2144 				error = DMSG_IOQ_ERROR_EALREADY;
2145 			} else {
2146 				fprintf(stderr, "reused-state(r,d) %s\n",
2147 					dmsg_msg_str(msg));
2148 				error = DMSG_IOQ_ERROR_TRANS;
2149 				assert(0);
2150 			}
2151 			break;
2152 		}
2153 		error = 0;
2154 		break;
2155 	case DMSGF_REPLY:
2156 		/*
2157 		 * Check for mid-stream ABORT reply received to sent command.
2158 		 */
2159 		if (msg->any.head.cmd & DMSGF_ABORT) {
2160 			if (state == pstate ||
2161 			    (state->rxcmd & DMSGF_CREATE) == 0) {
2162 				error = DMSG_IOQ_ERROR_EALREADY;
2163 				break;
2164 			}
2165 		}
2166 		error = 0;
2167 		break;
2168 	}
2169 
2170 	/*
2171 	 * Calculate the easy-switch() transactional command.  Represents
2172 	 * the outer-transaction command for any transaction-create or
2173 	 * transaction-delete, and the inner message command for any
2174 	 * non-transaction or inside-transaction command.  tcmd will be
2175 	 * set to 0 for any messaging error condition.
2176 	 *
2177 	 * The two can be told apart because outer-transaction commands
2178 	 * always have a DMSGF_CREATE and/or DMSGF_DELETE flag.
2179 	 */
2180 	if (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE)) {
2181 		if ((state->flags & DMSG_STATE_ROOT) == 0) {
2182 			msg->tcmd = (msg->state->icmd & DMSGF_BASECMDMASK) |
2183 				    (msg->any.head.cmd & (DMSGF_CREATE |
2184 							  DMSGF_DELETE |
2185 							  DMSGF_REPLY));
2186 		} else {
2187 			msg->tcmd = 0;
2188 		}
2189 	} else {
2190 		msg->tcmd = msg->any.head.cmd & DMSGF_CMDSWMASK;
2191 	}
2192 	return (error);
2193 }
2194 
2195 /*
2196  * Route the message and handle pair-state processing.
2197  */
2198 void
2199 dmsg_state_relay(dmsg_msg_t *lmsg)
2200 {
2201 	dmsg_state_t *lpstate;
2202 	dmsg_state_t *rpstate;
2203 	dmsg_state_t *lstate;
2204 	dmsg_state_t *rstate;
2205 	dmsg_msg_t *rmsg;
2206 
2207 	if ((lmsg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
2208 	    DMSGF_CREATE) {
2209 		/*
2210 		 * New sub-transaction, establish new state and relay.
2211 		 */
2212 		lstate = lmsg->state;
2213 		lpstate = lstate->parent;
2214 		rpstate = lpstate->relay;
2215 		assert(lstate->relay == NULL);
2216 		assert(rpstate != NULL);
2217 
2218 		rmsg = dmsg_msg_alloc(rpstate,
2219 				      lmsg->aux_size,
2220 				      lmsg->any.head.cmd,
2221 				      dmsg_state_relay, NULL);
2222 		rstate = rmsg->state;
2223 		rstate->relay = lstate;
2224 		lstate->relay = rstate;
2225 		dmsg_state_hold(lstate);
2226 		dmsg_state_hold(rstate);
2227 	} else {
2228 		/*
2229 		 * State & relay already established
2230 		 */
2231 		lstate = lmsg->state;
2232 		rstate = lstate->relay;
2233 		assert(rstate != NULL);
2234 
2235 		rmsg = dmsg_msg_alloc(rstate,
2236 				      lmsg->aux_size,
2237 				      lmsg->any.head.cmd,
2238 				      dmsg_state_relay, NULL);
2239 	}
2240 	if (lmsg->hdr_size > sizeof(lmsg->any.head)) {
2241 		bcopy(&lmsg->any.head + 1, &rmsg->any.head + 1,
2242 		      lmsg->hdr_size - sizeof(lmsg->any.head));
2243 	}
2244 	rmsg->any.head.error = lmsg->any.head.error;
2245 	rmsg->any.head.reserved02 = lmsg->any.head.reserved02;
2246 	rmsg->any.head.reserved18 = lmsg->any.head.reserved18;
2247 	rmsg->aux_data = lmsg->aux_data;
2248 	lmsg->aux_data = NULL;
2249 	/*
2250 	fprintf(stderr, "RELAY %08x\n", rmsg->any.head.cmd);
2251 	*/
2252 	dmsg_msg_write(rmsg);
2253 }
2254 
2255 /*
2256  * Cleanup and retire msg after processing
2257  */
2258 void
2259 dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
2260 {
2261 	dmsg_state_t *state;
2262 	dmsg_state_t *pstate;
2263 
2264 	assert(msg->state->iocom == iocom);
2265 	state = msg->state;
2266 	if (state->flags & DMSG_STATE_ROOT) {
2267 		/*
2268 		 * Free a non-transactional message, there is no state
2269 		 * to worry about.
2270 		 */
2271 		dmsg_msg_free(msg);
2272 	} else if (msg->any.head.cmd & DMSGF_DELETE) {
2273 		/*
2274 		 * Message terminating transaction, destroy the related
2275 		 * state, the original message, and this message (if it
2276 		 * isn't the original message due to a CREATE|DELETE).
2277 		 *
2278 		 * It's possible for governing state to terminate while
2279 		 * sub-transactions still exist.  This is allowed but
2280 		 * will cause sub-transactions to recursively fail.
2281 		 * Further reception of sub-transaction messages will be
2282 		 * impossible because the circuit will no longer exist.
2283 		 * (XXX need code to make sure that happens properly).
2284 		 */
2285 		pthread_mutex_lock(&iocom->mtx);
2286 		state->rxcmd |= DMSGF_DELETE;
2287 
2288 		if (state->txcmd & DMSGF_DELETE) {
2289 			assert(state->flags & DMSG_STATE_INSERTED);
2290 			if (state->rxcmd & DMSGF_REPLY) {
2291 				assert(msg->any.head.cmd & DMSGF_REPLY);
2292 				RB_REMOVE(dmsg_state_tree,
2293 					  &iocom->statewr_tree, state);
2294 			} else {
2295 				assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2296 				RB_REMOVE(dmsg_state_tree,
2297 					  &iocom->staterd_tree, state);
2298 			}
2299 			pstate = state->parent;
2300 			TAILQ_REMOVE(&pstate->subq, state, entry);
2301 			state->flags &= ~DMSG_STATE_INSERTED;
2302 			state->parent = NULL;
2303 			dmsg_state_drop(pstate);
2304 
2305 			if (state->relay) {
2306 				dmsg_state_drop(state->relay);
2307 				state->relay = NULL;
2308 			}
2309 			dmsg_msg_free(msg);
2310 			dmsg_state_drop(state);
2311 		} else {
2312 			dmsg_msg_free(msg);
2313 		}
2314 		pthread_mutex_unlock(&iocom->mtx);
2315 	} else {
2316 		/*
2317 		 * Message not terminating transaction, leave state intact
2318 		 * and free message if it isn't the CREATE message.
2319 		 */
2320 		dmsg_msg_free(msg);
2321 	}
2322 }
2323 
2324 /*
2325  * Clean up the state after pulling out needed fields and queueing the
2326  * message for transmission.   This occurs in dmsg_msg_write().
2327  */
2328 static void
2329 dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
2330 {
2331 	dmsg_state_t *state;
2332 	dmsg_state_t *pstate;
2333 
2334 	assert(iocom == msg->state->iocom);
2335 	state = msg->state;
2336 	if (state->flags & DMSG_STATE_ROOT) {
2337 		;
2338 	} else if (msg->any.head.cmd & DMSGF_DELETE) {
2339 		/*
2340 		 * Message terminating transaction, destroy the related
2341 		 * state, the original message, and this message (if it
2342 		 * isn't the original message due to a CREATE|DELETE).
2343 		 *
2344 		 * It's possible for governing state to terminate while
2345 		 * sub-transactions still exist.  This is allowed but
2346 		 * will cause sub-transactions to recursively fail.
2347 		 * Further reception of sub-transaction messages will be
2348 		 * impossible because the circuit will no longer exist.
2349 		 * (XXX need code to make sure that happens properly).
2350 		 */
2351 		pthread_mutex_lock(&iocom->mtx);
2352 		assert((state->txcmd & DMSGF_DELETE) == 0);
2353 		state->txcmd |= DMSGF_DELETE;
2354 		if (state->rxcmd & DMSGF_DELETE) {
2355 			assert(state->flags & DMSG_STATE_INSERTED);
2356 			if (state->txcmd & DMSGF_REPLY) {
2357 				assert(msg->any.head.cmd & DMSGF_REPLY);
2358 				RB_REMOVE(dmsg_state_tree,
2359 					  &iocom->staterd_tree, state);
2360 			} else {
2361 				assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2362 				RB_REMOVE(dmsg_state_tree,
2363 					  &iocom->statewr_tree, state);
2364 			}
2365 			pstate = state->parent;
2366 			TAILQ_REMOVE(&pstate->subq, state, entry);
2367 			state->flags &= ~DMSG_STATE_INSERTED;
2368 			state->parent = NULL;
2369 			dmsg_state_drop(pstate);
2370 
2371 			if (state->relay) {
2372 				dmsg_state_drop(state->relay);
2373 				state->relay = NULL;
2374 			}
2375 			dmsg_state_drop(state);	/* usually the last drop */
2376 		}
2377 		pthread_mutex_unlock(&iocom->mtx);
2378 	}
2379 }
2380 
2381 /*
2382  * Called with or without locks
2383  */
2384 void
2385 dmsg_state_hold(dmsg_state_t *state)
2386 {
2387 	atomic_add_int(&state->refs, 1);
2388 }
2389 
2390 void
2391 dmsg_state_drop(dmsg_state_t *state)
2392 {
2393 	if (atomic_fetchadd_int(&state->refs, -1) == 1)
2394 		dmsg_state_free(state);
2395 }
2396 
2397 /*
2398  * Called with iocom locked
2399  */
2400 static void
2401 dmsg_state_free(dmsg_state_t *state)
2402 {
2403 	atomic_add_int(&dmsg_state_count, -1);
2404 	if (DMsgDebugOpt) {
2405 		fprintf(stderr, "terminate state %p id=%08x\n",
2406 			state, (uint32_t)state->msgid);
2407 	}
2408 	assert((state->flags & (DMSG_STATE_ROOT | DMSG_STATE_INSERTED)) == 0);
2409 	assert(TAILQ_EMPTY(&state->subq));
2410 	assert(state->refs == 0);
2411 	if (state->any.any != NULL)   /* XXX avoid deadlock w/exit & kernel */
2412 		closefrom(3);
2413 	assert(state->any.any == NULL);
2414 	free(state);
2415 }
2416 
2417 /*
2418  * This swaps endian for a hammer2_msg_hdr.  Note that the extended
2419  * header is not adjusted, just the core header.
2420  */
2421 void
2422 dmsg_bswap_head(dmsg_hdr_t *head)
2423 {
2424 	head->magic	= bswap16(head->magic);
2425 	head->reserved02 = bswap16(head->reserved02);
2426 	head->salt	= bswap32(head->salt);
2427 
2428 	head->msgid	= bswap64(head->msgid);
2429 	head->circuit	= bswap64(head->circuit);
2430 	head->reserved18= bswap64(head->reserved18);
2431 
2432 	head->cmd	= bswap32(head->cmd);
2433 	head->aux_crc	= bswap32(head->aux_crc);
2434 	head->aux_bytes	= bswap32(head->aux_bytes);
2435 	head->error	= bswap32(head->error);
2436 	head->aux_descr = bswap64(head->aux_descr);
2437 	head->reserved38= bswap32(head->reserved38);
2438 	head->hdr_crc	= bswap32(head->hdr_crc);
2439 }
2440