xref: /dragonfly/lib/libdmsg/msg.c (revision 4d0c54c1)
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35 
36 #include "dmsg_local.h"
37 
38 int DMsgDebugOpt;
39 
40 static int dmsg_state_msgrx(dmsg_msg_t *msg);
41 static void dmsg_state_cleanuptx(dmsg_msg_t *msg);
42 static void dmsg_msg_free_locked(dmsg_msg_t *msg);
43 
44 RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
45 RB_GENERATE(dmsg_circuit_tree, dmsg_circuit, rbnode, dmsg_circuit_cmp);
46 
47 /*
48  * STATE TREE - Represents open transactions which are indexed by their
49  *		{ msgid } relative to the governing iocom.
50  */
51 int
52 dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2)
53 {
54 	if (state1->msgid < state2->msgid)
55 		return(-1);
56 	if (state1->msgid > state2->msgid)
57 		return(1);
58 	return(0);
59 }
60 
61 /*
62  * CIRCUIT TREE - Represents open circuits which are indexed by their
63  *		  { msgid } relative to the governing iocom.
64  */
65 int
66 dmsg_circuit_cmp(dmsg_circuit_t *circuit1, dmsg_circuit_t *circuit2)
67 {
68 	if (circuit1->msgid < circuit2->msgid)
69 		return(-1);
70 	if (circuit1->msgid > circuit2->msgid)
71 		return(1);
72 	return(0);
73 }
74 
75 /*
76  * Initialize a low-level ioq
77  */
78 void
79 dmsg_ioq_init(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
80 {
81 	bzero(ioq, sizeof(*ioq));
82 	ioq->state = DMSG_MSGQ_STATE_HEADER1;
83 	TAILQ_INIT(&ioq->msgq);
84 }
85 
86 /*
87  * Cleanup queue.
88  *
89  * caller holds iocom->mtx.
90  */
91 void
92 dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
93 {
94 	dmsg_msg_t *msg;
95 
96 	while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
97 		assert(0);	/* shouldn't happen */
98 		TAILQ_REMOVE(&ioq->msgq, msg, qentry);
99 		dmsg_msg_free(msg);
100 	}
101 	if ((msg = ioq->msg) != NULL) {
102 		ioq->msg = NULL;
103 		dmsg_msg_free(msg);
104 	}
105 }
106 
107 /*
108  * Initialize a low-level communications channel.
109  *
110  * NOTE: The signal_func() is called at least once from the loop and can be
111  *	 re-armed via dmsg_iocom_restate().
112  */
113 void
114 dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
115 		   void (*signal_func)(dmsg_iocom_t *),
116 		   void (*rcvmsg_func)(dmsg_msg_t *),
117 		   void (*dbgmsg_func)(dmsg_msg_t *),
118 		   void (*altmsg_func)(dmsg_iocom_t *))
119 {
120 	struct stat st;
121 
122 	bzero(iocom, sizeof(*iocom));
123 
124 	asprintf(&iocom->label, "iocom-%p", iocom);
125 	iocom->signal_callback = signal_func;
126 	iocom->rcvmsg_callback = rcvmsg_func;
127 	iocom->altmsg_callback = altmsg_func;
128 	iocom->dbgmsg_callback = dbgmsg_func;
129 
130 	pthread_mutex_init(&iocom->mtx, NULL);
131 	RB_INIT(&iocom->circuit_tree);
132 	TAILQ_INIT(&iocom->freeq);
133 	TAILQ_INIT(&iocom->freeq_aux);
134 	TAILQ_INIT(&iocom->txmsgq);
135 	iocom->sock_fd = sock_fd;
136 	iocom->alt_fd = alt_fd;
137 	iocom->flags = DMSG_IOCOMF_RREQ | DMSG_IOCOMF_CLOSEALT;
138 	if (signal_func)
139 		iocom->flags |= DMSG_IOCOMF_SWORK;
140 	dmsg_ioq_init(iocom, &iocom->ioq_rx);
141 	dmsg_ioq_init(iocom, &iocom->ioq_tx);
142 	if (pipe(iocom->wakeupfds) < 0)
143 		assert(0);
144 	fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
145 	fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
146 
147 	dmsg_circuit_init(iocom, &iocom->circuit0);
148 
149 	/*
150 	 * Negotiate session crypto synchronously.  This will mark the
151 	 * connection as error'd if it fails.  If this is a pipe it's
152 	 * a linkage that we set up ourselves to the filesystem and there
153 	 * is no crypto.
154 	 */
155 	if (fstat(sock_fd, &st) < 0)
156 		assert(0);
157 	if (S_ISSOCK(st.st_mode))
158 		dmsg_crypto_negotiate(iocom);
159 
160 	/*
161 	 * Make sure our fds are set to non-blocking for the iocom core.
162 	 */
163 	if (sock_fd >= 0)
164 		fcntl(sock_fd, F_SETFL, O_NONBLOCK);
165 #if 0
166 	/* if line buffered our single fgets() should be fine */
167 	if (alt_fd >= 0)
168 		fcntl(alt_fd, F_SETFL, O_NONBLOCK);
169 #endif
170 }
171 
172 void
173 dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...)
174 {
175 	va_list va;
176 	char *optr;
177 
178 	va_start(va, ctl);
179 	optr = iocom->label;
180 	vasprintf(&iocom->label, ctl, va);
181 	va_end(va);
182 	if (optr)
183 		free(optr);
184 }
185 
186 /*
187  * May only be called from a callback from iocom_core.
188  *
189  * Adjust state machine functions, set flags to guarantee that both
190  * the recevmsg_func and the sendmsg_func is called at least once.
191  */
192 void
193 dmsg_iocom_restate(dmsg_iocom_t *iocom,
194 		   void (*signal_func)(dmsg_iocom_t *),
195 		   void (*rcvmsg_func)(dmsg_msg_t *msg),
196 		   void (*altmsg_func)(dmsg_iocom_t *))
197 {
198 	pthread_mutex_lock(&iocom->mtx);
199 	iocom->signal_callback = signal_func;
200 	iocom->rcvmsg_callback = rcvmsg_func;
201 	iocom->altmsg_callback = altmsg_func;
202 	if (signal_func)
203 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
204 	else
205 		atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
206 	pthread_mutex_unlock(&iocom->mtx);
207 }
208 
209 void
210 dmsg_iocom_signal(dmsg_iocom_t *iocom)
211 {
212 	pthread_mutex_lock(&iocom->mtx);
213 	if (iocom->signal_callback)
214 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
215 	pthread_mutex_unlock(&iocom->mtx);
216 }
217 
218 /*
219  * Cleanup a terminating iocom.
220  *
221  * Caller should not hold iocom->mtx.  The iocom has already been disconnected
222  * from all possible references to it.
223  */
224 void
225 dmsg_iocom_done(dmsg_iocom_t *iocom)
226 {
227 	dmsg_msg_t *msg;
228 
229 	if (iocom->sock_fd >= 0) {
230 		close(iocom->sock_fd);
231 		iocom->sock_fd = -1;
232 	}
233 	if (iocom->alt_fd >= 0 && (iocom->flags & DMSG_IOCOMF_CLOSEALT)) {
234 		close(iocom->alt_fd);
235 		iocom->alt_fd = -1;
236 	}
237 	dmsg_ioq_done(iocom, &iocom->ioq_rx);
238 	dmsg_ioq_done(iocom, &iocom->ioq_tx);
239 	while ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
240 		TAILQ_REMOVE(&iocom->freeq, msg, qentry);
241 		free(msg);
242 	}
243 	while ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
244 		TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
245 		free(msg->aux_data);
246 		msg->aux_data = NULL;
247 		free(msg);
248 	}
249 	if (iocom->wakeupfds[0] >= 0) {
250 		close(iocom->wakeupfds[0]);
251 		iocom->wakeupfds[0] = -1;
252 	}
253 	if (iocom->wakeupfds[1] >= 0) {
254 		close(iocom->wakeupfds[1]);
255 		iocom->wakeupfds[1] = -1;
256 	}
257 	pthread_mutex_destroy(&iocom->mtx);
258 }
259 
260 /*
261  * Basic initialization of a circuit structure.
262  *
263  * The circuit structure is initialized with one ref.
264  */
265 void
266 dmsg_circuit_init(dmsg_iocom_t *iocom, dmsg_circuit_t *circuit)
267 {
268 	circuit->refs = 1;
269 	circuit->iocom = iocom;
270 	RB_INIT(&circuit->staterd_tree);
271 	RB_INIT(&circuit->statewr_tree);
272 }
273 
274 /*
275  * Allocate a new one-way message.
276  */
277 dmsg_msg_t *
278 dmsg_msg_alloc(dmsg_circuit_t *circuit,
279 	       size_t aux_size, uint32_t cmd,
280 	       void (*func)(dmsg_msg_t *), void *data)
281 {
282 	dmsg_iocom_t *iocom = circuit->iocom;
283 	dmsg_state_t *state = NULL;
284 	dmsg_msg_t *msg;
285 	int hbytes;
286 	size_t aligned_size;
287 
288 	pthread_mutex_lock(&iocom->mtx);
289 #if 0
290 	if (aux_size) {
291 		aligned_size = DMSG_DOALIGN(aux_size);
292 		if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
293 			TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
294 	} else {
295 		aligned_size = 0;
296 		if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
297 			TAILQ_REMOVE(&iocom->freeq, msg, qentry);
298 	}
299 #endif
300 	aligned_size = DMSG_DOALIGN(aux_size);
301 	msg = NULL;
302 	if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
303 		/*
304 		 * Create state when CREATE is set without REPLY.
305 		 * Assign a unique msgid, in this case simply using
306 		 * the pointer value for 'state'.
307 		 *
308 		 * NOTE: CREATE in txcmd handled by dmsg_msg_write()
309 		 * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx()
310 		 *
311 		 * NOTE: state initiated by us and state initiated by
312 		 *	 a remote create are placed in different RB trees.
313 		 *	 The msgid for SPAN state is used in source/target
314 		 *	 for message routing as appropriate.
315 		 */
316 		state = malloc(sizeof(*state));
317 		bzero(state, sizeof(*state));
318 		state->iocom = iocom;
319 		state->circuit = circuit;
320 		state->flags = DMSG_STATE_DYNAMIC;
321 		state->msgid = (uint64_t)(uintptr_t)state;
322 		state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE);
323 		state->rxcmd = DMSGF_REPLY;
324 		state->icmd = state->txcmd & DMSGF_BASECMDMASK;
325 		state->func = func;
326 		state->any.any = data;
327 		RB_INSERT(dmsg_state_tree, &circuit->statewr_tree, state);
328 		state->flags |= DMSG_STATE_INSERTED;
329 	}
330 	/* XXX SMP race for state */
331 	pthread_mutex_unlock(&iocom->mtx);
332 	hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
333 	if (msg == NULL) {
334 		msg = malloc(offsetof(struct dmsg_msg, any.head) + hbytes + 4);
335 		bzero(msg, offsetof(struct dmsg_msg, any.head));
336 		*(int *)((char *)msg +
337 			 offsetof(struct dmsg_msg, any.head) + hbytes) =
338 				 0x71B2C3D4;
339 #if 0
340 		msg = malloc(sizeof(*msg));
341 		bzero(msg, sizeof(*msg));
342 #endif
343 	}
344 
345 	/*
346 	 * [re]allocate the auxillary data buffer.  The caller knows that
347 	 * a size-aligned buffer will be allocated but we do not want to
348 	 * force the caller to zero any tail piece, so we do that ourself.
349 	 */
350 	if (msg->aux_size != aux_size) {
351 		if (msg->aux_data) {
352 			free(msg->aux_data);
353 			msg->aux_data = NULL;
354 			msg->aux_size = 0;
355 		}
356 		if (aux_size) {
357 			msg->aux_data = malloc(aligned_size);
358 			msg->aux_size = aux_size;
359 			if (aux_size != aligned_size) {
360 				bzero(msg->aux_data + aux_size,
361 				      aligned_size - aux_size);
362 			}
363 		}
364 	}
365 	if (hbytes)
366 		bzero(&msg->any.head, hbytes);
367 	msg->hdr_size = hbytes;
368 	msg->any.head.magic = DMSG_HDR_MAGIC;
369 	msg->any.head.cmd = cmd;
370 	msg->any.head.aux_descr = 0;
371 	msg->any.head.aux_crc = 0;
372 	msg->any.head.circuit = 0;
373 	msg->circuit = circuit;
374 	msg->iocom = iocom;
375 	dmsg_circuit_hold(circuit);
376 	if (state) {
377 		msg->state = state;
378 		state->msg = msg;
379 		msg->any.head.msgid = state->msgid;
380 	} else {
381 		msg->any.head.msgid = 0;
382 	}
383 	return (msg);
384 }
385 
386 /*
387  * Free a message so it can be reused afresh.
388  *
389  * NOTE: aux_size can be 0 with a non-NULL aux_data.
390  */
391 static
392 void
393 dmsg_msg_free_locked(dmsg_msg_t *msg)
394 {
395 	/*dmsg_iocom_t *iocom = msg->iocom;*/
396 #if 1
397 	int hbytes = (msg->any.head.cmd & DMSGF_SIZE) * DMSG_ALIGN;
398 	if (*(int *)((char *)msg +
399 		     offsetof(struct  dmsg_msg, any.head) + hbytes) !=
400 	     0x71B2C3D4) {
401 		fprintf(stderr, "MSGFREE FAILED CMD %08x\n", msg->any.head.cmd);
402 		assert(0);
403 	}
404 #endif
405 	if (msg->circuit) {
406 		dmsg_circuit_drop_locked(msg->circuit);
407 		msg->circuit = NULL;
408 	}
409 	msg->state = NULL;
410 	if (msg->aux_data) {
411 		free(msg->aux_data);
412 		msg->aux_data = NULL;
413 	}
414 	msg->aux_size = 0;
415 	free (msg);
416 #if 0
417 	if (msg->aux_data)
418 		TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
419 	else
420 		TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
421 #endif
422 }
423 
424 void
425 dmsg_msg_free(dmsg_msg_t *msg)
426 {
427 	dmsg_iocom_t *iocom = msg->iocom;
428 
429 	pthread_mutex_lock(&iocom->mtx);
430 	dmsg_msg_free_locked(msg);
431 	pthread_mutex_unlock(&iocom->mtx);
432 }
433 
434 /*
435  * I/O core loop for an iocom.
436  *
437  * Thread localized, iocom->mtx not held.
438  */
439 void
440 dmsg_iocom_core(dmsg_iocom_t *iocom)
441 {
442 	struct pollfd fds[3];
443 	char dummybuf[256];
444 	dmsg_msg_t *msg;
445 	int timeout;
446 	int count;
447 	int wi;	/* wakeup pipe */
448 	int si;	/* socket */
449 	int ai;	/* alt bulk path socket */
450 
451 	while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) {
452 		/*
453 		 * These iocom->flags are only manipulated within the
454 		 * context of the current thread.  However, modifications
455 		 * still require atomic ops.
456 		 */
457 		if ((iocom->flags & (DMSG_IOCOMF_RWORK |
458 				     DMSG_IOCOMF_WWORK |
459 				     DMSG_IOCOMF_PWORK |
460 				     DMSG_IOCOMF_SWORK |
461 				     DMSG_IOCOMF_ARWORK |
462 				     DMSG_IOCOMF_AWWORK)) == 0) {
463 			/*
464 			 * Only poll if no immediate work is pending.
465 			 * Otherwise we are just wasting our time calling
466 			 * poll.
467 			 */
468 			timeout = 5000;
469 
470 			count = 0;
471 			wi = -1;
472 			si = -1;
473 			ai = -1;
474 
475 			/*
476 			 * Always check the inter-thread pipe, e.g.
477 			 * for iocom->txmsgq work.
478 			 */
479 			wi = count++;
480 			fds[wi].fd = iocom->wakeupfds[0];
481 			fds[wi].events = POLLIN;
482 			fds[wi].revents = 0;
483 
484 			/*
485 			 * Check the socket input/output direction as
486 			 * requested
487 			 */
488 			if (iocom->flags & (DMSG_IOCOMF_RREQ |
489 					    DMSG_IOCOMF_WREQ)) {
490 				si = count++;
491 				fds[si].fd = iocom->sock_fd;
492 				fds[si].events = 0;
493 				fds[si].revents = 0;
494 
495 				if (iocom->flags & DMSG_IOCOMF_RREQ)
496 					fds[si].events |= POLLIN;
497 				if (iocom->flags & DMSG_IOCOMF_WREQ)
498 					fds[si].events |= POLLOUT;
499 			}
500 
501 			/*
502 			 * Check the alternative fd for work.
503 			 */
504 			if (iocom->alt_fd >= 0) {
505 				ai = count++;
506 				fds[ai].fd = iocom->alt_fd;
507 				fds[ai].events = POLLIN;
508 				fds[ai].revents = 0;
509 			}
510 			poll(fds, count, timeout);
511 
512 			if (wi >= 0 && (fds[wi].revents & POLLIN))
513 				atomic_set_int(&iocom->flags,
514 					       DMSG_IOCOMF_PWORK);
515 			if (si >= 0 && (fds[si].revents & POLLIN))
516 				atomic_set_int(&iocom->flags,
517 					       DMSG_IOCOMF_RWORK);
518 			if (si >= 0 && (fds[si].revents & POLLOUT))
519 				atomic_set_int(&iocom->flags,
520 					       DMSG_IOCOMF_WWORK);
521 			if (wi >= 0 && (fds[wi].revents & POLLOUT))
522 				atomic_set_int(&iocom->flags,
523 					       DMSG_IOCOMF_WWORK);
524 			if (ai >= 0 && (fds[ai].revents & POLLIN))
525 				atomic_set_int(&iocom->flags,
526 					       DMSG_IOCOMF_ARWORK);
527 		} else {
528 			/*
529 			 * Always check the pipe
530 			 */
531 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_PWORK);
532 		}
533 
534 		if (iocom->flags & DMSG_IOCOMF_SWORK) {
535 			atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
536 			iocom->signal_callback(iocom);
537 		}
538 
539 		/*
540 		 * Pending message queues from other threads wake us up
541 		 * with a write to the wakeupfds[] pipe.  We have to clear
542 		 * the pipe with a dummy read.
543 		 */
544 		if (iocom->flags & DMSG_IOCOMF_PWORK) {
545 			atomic_clear_int(&iocom->flags, DMSG_IOCOMF_PWORK);
546 			read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
547 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
548 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK);
549 			if (TAILQ_FIRST(&iocom->txmsgq))
550 				dmsg_iocom_flush1(iocom);
551 		}
552 
553 		/*
554 		 * Message write sequencing
555 		 */
556 		if (iocom->flags & DMSG_IOCOMF_WWORK)
557 			dmsg_iocom_flush1(iocom);
558 
559 		/*
560 		 * Message read sequencing.  Run this after the write
561 		 * sequencing in case the write sequencing allowed another
562 		 * auto-DELETE to occur on the read side.
563 		 */
564 		if (iocom->flags & DMSG_IOCOMF_RWORK) {
565 			while ((iocom->flags & DMSG_IOCOMF_EOF) == 0 &&
566 			       (msg = dmsg_ioq_read(iocom)) != NULL) {
567 				if (DMsgDebugOpt) {
568 					fprintf(stderr, "receive %s\n",
569 						dmsg_msg_str(msg));
570 				}
571 				iocom->rcvmsg_callback(msg);
572 				dmsg_state_cleanuprx(iocom, msg);
573 			}
574 		}
575 
576 		if (iocom->flags & DMSG_IOCOMF_ARWORK) {
577 			atomic_clear_int(&iocom->flags, DMSG_IOCOMF_ARWORK);
578 			iocom->altmsg_callback(iocom);
579 		}
580 	}
581 }
582 
583 /*
584  * Make sure there's enough room in the FIFO to hold the
585  * needed data.
586  *
587  * Assume worst case encrypted form is 2x the size of the
588  * plaintext equivalent.
589  */
590 static
591 size_t
592 dmsg_ioq_makeroom(dmsg_ioq_t *ioq, size_t needed)
593 {
594 	size_t bytes;
595 	size_t nmax;
596 
597 	bytes = ioq->fifo_cdx - ioq->fifo_beg;
598 	nmax = sizeof(ioq->buf) - ioq->fifo_end;
599 	if (bytes + nmax / 2 < needed) {
600 		if (bytes) {
601 			bcopy(ioq->buf + ioq->fifo_beg,
602 			      ioq->buf,
603 			      bytes);
604 		}
605 		ioq->fifo_cdx -= ioq->fifo_beg;
606 		ioq->fifo_beg = 0;
607 		if (ioq->fifo_cdn < ioq->fifo_end) {
608 			bcopy(ioq->buf + ioq->fifo_cdn,
609 			      ioq->buf + ioq->fifo_cdx,
610 			      ioq->fifo_end - ioq->fifo_cdn);
611 		}
612 		ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx;
613 		ioq->fifo_cdn = ioq->fifo_cdx;
614 		nmax = sizeof(ioq->buf) - ioq->fifo_end;
615 	}
616 	return(nmax);
617 }
618 
619 /*
620  * Read the next ready message from the ioq, issuing I/O if needed.
621  * Caller should retry on a read-event when NULL is returned.
622  *
623  * If an error occurs during reception a DMSG_LNK_ERROR msg will
624  * be returned for each open transaction, then the ioq and iocom
625  * will be errored out and a non-transactional DMSG_LNK_ERROR
626  * msg will be returned as the final message.  The caller should not call
627  * us again after the final message is returned.
628  *
629  * Thread localized, iocom->mtx not held.
630  */
631 dmsg_msg_t *
632 dmsg_ioq_read(dmsg_iocom_t *iocom)
633 {
634 	dmsg_ioq_t *ioq = &iocom->ioq_rx;
635 	dmsg_msg_t *msg;
636 	dmsg_state_t *state;
637 	dmsg_circuit_t *circuit0;
638 	dmsg_hdr_t *head;
639 	ssize_t n;
640 	size_t bytes;
641 	size_t nmax;
642 	uint32_t aux_size;
643 	uint32_t xcrc32;
644 	int error;
645 
646 again:
647 	/*
648 	 * If a message is already pending we can just remove and
649 	 * return it.  Message state has already been processed.
650 	 * (currently not implemented)
651 	 */
652 	if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
653 		TAILQ_REMOVE(&ioq->msgq, msg, qentry);
654 		return (msg);
655 	}
656 	atomic_clear_int(&iocom->flags, DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK);
657 
658 	/*
659 	 * If the stream is errored out we stop processing it.
660 	 */
661 	if (ioq->error)
662 		goto skip;
663 
664 	/*
665 	 * Message read in-progress (msg is NULL at the moment).  We don't
666 	 * allocate a msg until we have its core header.
667 	 */
668 	nmax = sizeof(ioq->buf) - ioq->fifo_end;
669 	bytes = ioq->fifo_cdx - ioq->fifo_beg;		/* already decrypted */
670 	msg = ioq->msg;
671 
672 	switch(ioq->state) {
673 	case DMSG_MSGQ_STATE_HEADER1:
674 		/*
675 		 * Load the primary header, fail on any non-trivial read
676 		 * error or on EOF.  Since the primary header is the same
677 		 * size is the message alignment it will never straddle
678 		 * the end of the buffer.
679 		 */
680 		nmax = dmsg_ioq_makeroom(ioq, sizeof(msg->any.head));
681 		if (bytes < sizeof(msg->any.head)) {
682 			n = read(iocom->sock_fd,
683 				 ioq->buf + ioq->fifo_end,
684 				 nmax);
685 			if (n <= 0) {
686 				if (n == 0) {
687 					ioq->error = DMSG_IOQ_ERROR_EOF;
688 					break;
689 				}
690 				if (errno != EINTR &&
691 				    errno != EINPROGRESS &&
692 				    errno != EAGAIN) {
693 					ioq->error = DMSG_IOQ_ERROR_SOCK;
694 					break;
695 				}
696 				n = 0;
697 				/* fall through */
698 			}
699 			ioq->fifo_end += (size_t)n;
700 			nmax -= (size_t)n;
701 		}
702 
703 		/*
704 		 * Decrypt data received so far.  Data will be decrypted
705 		 * in-place but might create gaps in the FIFO.  Partial
706 		 * blocks are not immediately decrypted.
707 		 *
708 		 * WARNING!  The header might be in the wrong endian, we
709 		 *	     do not fix it up until we get the entire
710 		 *	     extended header.
711 		 */
712 		if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
713 			dmsg_crypto_decrypt(iocom, ioq);
714 		} else {
715 			ioq->fifo_cdx = ioq->fifo_end;
716 			ioq->fifo_cdn = ioq->fifo_end;
717 		}
718 		bytes = ioq->fifo_cdx - ioq->fifo_beg;
719 
720 		/*
721 		 * Insufficient data accumulated (msg is NULL, caller will
722 		 * retry on event).
723 		 */
724 		assert(msg == NULL);
725 		if (bytes < sizeof(msg->any.head))
726 			break;
727 
728 		/*
729 		 * Check and fixup the core header.  Note that the icrc
730 		 * has to be calculated before any fixups, but the crc
731 		 * fields in the msg may have to be swapped like everything
732 		 * else.
733 		 */
734 		head = (void *)(ioq->buf + ioq->fifo_beg);
735 		if (head->magic != DMSG_HDR_MAGIC &&
736 		    head->magic != DMSG_HDR_MAGIC_REV) {
737 			fprintf(stderr, "%s: head->magic is bad %02x\n",
738 				iocom->label, head->magic);
739 			if (iocom->flags & DMSG_IOCOMF_CRYPTED)
740 				fprintf(stderr, "(on encrypted link)\n");
741 			ioq->error = DMSG_IOQ_ERROR_SYNC;
742 			break;
743 		}
744 
745 		/*
746 		 * Calculate the full header size and aux data size
747 		 */
748 		if (head->magic == DMSG_HDR_MAGIC_REV) {
749 			ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) *
750 				      DMSG_ALIGN;
751 			aux_size = bswap32(head->aux_bytes);
752 		} else {
753 			ioq->hbytes = (head->cmd & DMSGF_SIZE) *
754 				      DMSG_ALIGN;
755 			aux_size = head->aux_bytes;
756 		}
757 		ioq->abytes = DMSG_DOALIGN(aux_size);
758 		ioq->unaligned_aux_size = aux_size;
759 		if (ioq->hbytes < sizeof(msg->any.head) ||
760 		    ioq->hbytes > sizeof(msg->any) ||
761 		    ioq->abytes > DMSG_AUX_MAX) {
762 			ioq->error = DMSG_IOQ_ERROR_FIELD;
763 			break;
764 		}
765 
766 		/*
767 		 * Allocate the message, the next state will fill it in.
768 		 * Note that the aux_data buffer will be sized to an aligned
769 		 * value and the aligned remainder zero'd for convenience.
770 		 */
771 		msg = dmsg_msg_alloc(&iocom->circuit0, aux_size,
772 				     ioq->hbytes / DMSG_ALIGN,
773 				     NULL, NULL);
774 		ioq->msg = msg;
775 
776 		/*
777 		 * Fall through to the next state.  Make sure that the
778 		 * extended header does not straddle the end of the buffer.
779 		 * We still want to issue larger reads into our buffer,
780 		 * book-keeping is easier if we don't bcopy() yet.
781 		 *
782 		 * Make sure there is enough room for bloated encrypt data.
783 		 */
784 		nmax = dmsg_ioq_makeroom(ioq, ioq->hbytes);
785 		ioq->state = DMSG_MSGQ_STATE_HEADER2;
786 		/* fall through */
787 	case DMSG_MSGQ_STATE_HEADER2:
788 		/*
789 		 * Fill out the extended header.
790 		 */
791 		assert(msg != NULL);
792 		if (bytes < ioq->hbytes) {
793 			n = read(iocom->sock_fd,
794 				 ioq->buf + ioq->fifo_end,
795 				 nmax);
796 			if (n <= 0) {
797 				if (n == 0) {
798 					ioq->error = DMSG_IOQ_ERROR_EOF;
799 					break;
800 				}
801 				if (errno != EINTR &&
802 				    errno != EINPROGRESS &&
803 				    errno != EAGAIN) {
804 					ioq->error = DMSG_IOQ_ERROR_SOCK;
805 					break;
806 				}
807 				n = 0;
808 				/* fall through */
809 			}
810 			ioq->fifo_end += (size_t)n;
811 			nmax -= (size_t)n;
812 		}
813 
814 		if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
815 			dmsg_crypto_decrypt(iocom, ioq);
816 		} else {
817 			ioq->fifo_cdx = ioq->fifo_end;
818 			ioq->fifo_cdn = ioq->fifo_end;
819 		}
820 		bytes = ioq->fifo_cdx - ioq->fifo_beg;
821 
822 		/*
823 		 * Insufficient data accumulated (set msg NULL so caller will
824 		 * retry on event).
825 		 */
826 		if (bytes < ioq->hbytes) {
827 			msg = NULL;
828 			break;
829 		}
830 
831 		/*
832 		 * Calculate the extended header, decrypt data received
833 		 * so far.  Handle endian-conversion for the entire extended
834 		 * header.
835 		 */
836 		head = (void *)(ioq->buf + ioq->fifo_beg);
837 
838 		/*
839 		 * Check the CRC.
840 		 */
841 		if (head->magic == DMSG_HDR_MAGIC_REV)
842 			xcrc32 = bswap32(head->hdr_crc);
843 		else
844 			xcrc32 = head->hdr_crc;
845 		head->hdr_crc = 0;
846 		if (dmsg_icrc32(head, ioq->hbytes) != xcrc32) {
847 			ioq->error = DMSG_IOQ_ERROR_XCRC;
848 			fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n",
849 				xcrc32, dmsg_icrc32(head, ioq->hbytes),
850 				dmsg_msg_str(msg));
851 			assert(0);
852 			break;
853 		}
854 		head->hdr_crc = xcrc32;
855 
856 		if (head->magic == DMSG_HDR_MAGIC_REV) {
857 			dmsg_bswap_head(head);
858 		}
859 
860 		/*
861 		 * Copy the extended header into the msg and adjust the
862 		 * FIFO.
863 		 */
864 		bcopy(head, &msg->any, ioq->hbytes);
865 
866 		/*
867 		 * We are either done or we fall-through.
868 		 */
869 		if (ioq->abytes == 0) {
870 			ioq->fifo_beg += ioq->hbytes;
871 			break;
872 		}
873 
874 		/*
875 		 * Must adjust bytes (and the state) when falling through.
876 		 * nmax doesn't change.
877 		 */
878 		ioq->fifo_beg += ioq->hbytes;
879 		bytes -= ioq->hbytes;
880 		ioq->state = DMSG_MSGQ_STATE_AUXDATA1;
881 		/* fall through */
882 	case DMSG_MSGQ_STATE_AUXDATA1:
883 		/*
884 		 * Copy the partial or complete [decrypted] payload from
885 		 * remaining bytes in the FIFO in order to optimize the
886 		 * makeroom call in the AUXDATA2 state.  We have to
887 		 * fall-through either way so we can check the crc.
888 		 *
889 		 * msg->aux_size tracks our aux data.
890 		 *
891 		 * (Lets not complicate matters if the data is encrypted,
892 		 *  since the data in-stream is not the same size as the
893 		 *  data decrypted).
894 		 */
895 		if (bytes >= ioq->abytes) {
896 			bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
897 			      ioq->abytes);
898 			msg->aux_size = ioq->abytes;
899 			ioq->fifo_beg += ioq->abytes;
900 			assert(ioq->fifo_beg <= ioq->fifo_cdx);
901 			assert(ioq->fifo_cdx <= ioq->fifo_cdn);
902 			bytes -= ioq->abytes;
903 		} else if (bytes) {
904 			bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
905 			      bytes);
906 			msg->aux_size = bytes;
907 			ioq->fifo_beg += bytes;
908 			if (ioq->fifo_cdx < ioq->fifo_beg)
909 				ioq->fifo_cdx = ioq->fifo_beg;
910 			assert(ioq->fifo_beg <= ioq->fifo_cdx);
911 			assert(ioq->fifo_cdx <= ioq->fifo_cdn);
912 			bytes = 0;
913 		} else {
914 			msg->aux_size = 0;
915 		}
916 		ioq->state = DMSG_MSGQ_STATE_AUXDATA2;
917 		/* fall through */
918 	case DMSG_MSGQ_STATE_AUXDATA2:
919 		/*
920 		 * Make sure there is enough room for more data.
921 		 */
922 		assert(msg);
923 		nmax = dmsg_ioq_makeroom(ioq, ioq->abytes - msg->aux_size);
924 
925 		/*
926 		 * Read and decrypt more of the payload.
927 		 */
928 		if (msg->aux_size < ioq->abytes) {
929 			assert(bytes == 0);
930 			n = read(iocom->sock_fd,
931 				 ioq->buf + ioq->fifo_end,
932 				 nmax);
933 			if (n <= 0) {
934 				if (n == 0) {
935 					ioq->error = DMSG_IOQ_ERROR_EOF;
936 					break;
937 				}
938 				if (errno != EINTR &&
939 				    errno != EINPROGRESS &&
940 				    errno != EAGAIN) {
941 					ioq->error = DMSG_IOQ_ERROR_SOCK;
942 					break;
943 				}
944 				n = 0;
945 				/* fall through */
946 			}
947 			ioq->fifo_end += (size_t)n;
948 			nmax -= (size_t)n;
949 		}
950 
951 		if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
952 			dmsg_crypto_decrypt(iocom, ioq);
953 		} else {
954 			ioq->fifo_cdx = ioq->fifo_end;
955 			ioq->fifo_cdn = ioq->fifo_end;
956 		}
957 		bytes = ioq->fifo_cdx - ioq->fifo_beg;
958 
959 		if (bytes > ioq->abytes - msg->aux_size)
960 			bytes = ioq->abytes - msg->aux_size;
961 
962 		if (bytes) {
963 			bcopy(ioq->buf + ioq->fifo_beg,
964 			      msg->aux_data + msg->aux_size,
965 			      bytes);
966 			msg->aux_size += bytes;
967 			ioq->fifo_beg += bytes;
968 		}
969 
970 		/*
971 		 * Insufficient data accumulated (set msg NULL so caller will
972 		 * retry on event).
973 		 *
974 		 * Assert the auxillary data size is correct, then record the
975 		 * original unaligned size from the message header.
976 		 */
977 		if (msg->aux_size < ioq->abytes) {
978 			msg = NULL;
979 			break;
980 		}
981 		assert(msg->aux_size == ioq->abytes);
982 		msg->aux_size = ioq->unaligned_aux_size;
983 
984 		/*
985 		 * Check aux_crc, then we are done.  Note that the crc
986 		 * is calculated over the aligned size, not the actual
987 		 * size.
988 		 */
989 		xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes);
990 		if (xcrc32 != msg->any.head.aux_crc) {
991 			ioq->error = DMSG_IOQ_ERROR_ACRC;
992 			fprintf(stderr, "iocom: ACRC error %08x vs %08x msgid %016jx msgcmd %08x auxsize %d\n",
993 				xcrc32, msg->any.head.aux_crc, (intmax_t)msg->any.head.msgid, msg->any.head.cmd, msg->any.head.aux_bytes);
994 			break;
995 		}
996 		break;
997 	case DMSG_MSGQ_STATE_ERROR:
998 		/*
999 		 * Continued calls to drain recorded transactions (returning
1000 		 * a LNK_ERROR for each one), before we return the final
1001 		 * LNK_ERROR.
1002 		 */
1003 		assert(msg == NULL);
1004 		break;
1005 	default:
1006 		/*
1007 		 * We don't double-return errors, the caller should not
1008 		 * have called us again after getting an error msg.
1009 		 */
1010 		assert(0);
1011 		break;
1012 	}
1013 
1014 	/*
1015 	 * Check the message sequence.  The iv[] should prevent any
1016 	 * possibility of a replay but we add this check anyway.
1017 	 */
1018 	if (msg && ioq->error == 0) {
1019 		if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
1020 			ioq->error = DMSG_IOQ_ERROR_MSGSEQ;
1021 		} else {
1022 			++ioq->seq;
1023 		}
1024 	}
1025 
1026 	/*
1027 	 * Handle error, RREQ, or completion
1028 	 *
1029 	 * NOTE: nmax and bytes are invalid at this point, we don't bother
1030 	 *	 to update them when breaking out.
1031 	 */
1032 	if (ioq->error) {
1033 skip:
1034 		fprintf(stderr, "IOQ ERROR %d\n", ioq->error);
1035 		/*
1036 		 * An unrecoverable error causes all active receive
1037 		 * transactions to be terminated with a LNK_ERROR message.
1038 		 *
1039 		 * Once all active transactions are exhausted we set the
1040 		 * iocom ERROR flag and return a non-transactional LNK_ERROR
1041 		 * message, which should cause master processing loops to
1042 		 * terminate.
1043 		 */
1044 		assert(ioq->msg == msg);
1045 		if (msg) {
1046 			dmsg_msg_free(msg);
1047 			ioq->msg = NULL;
1048 		}
1049 
1050 		/*
1051 		 * No more I/O read processing
1052 		 */
1053 		ioq->state = DMSG_MSGQ_STATE_ERROR;
1054 
1055 		/*
1056 		 * Simulate a remote LNK_ERROR DELETE msg for any open
1057 		 * transactions, ending with a final non-transactional
1058 		 * LNK_ERROR (that the session can detect) when no
1059 		 * transactions remain.
1060 		 *
1061 		 * We only need to scan transactions on circuit0 as these
1062 		 * will contain all circuit forges, and terminating circuit
1063 		 * forges will automatically terminate the transactions on
1064 		 * any other circuits as well as those circuits.
1065 		 */
1066 		circuit0 = &iocom->circuit0;
1067 		msg = dmsg_msg_alloc(circuit0, 0, DMSG_LNK_ERROR, NULL, NULL);
1068 		msg->any.head.error = ioq->error;
1069 
1070 		pthread_mutex_lock(&iocom->mtx);
1071 		dmsg_iocom_drain(iocom);
1072 
1073 		if ((state = RB_ROOT(&circuit0->staterd_tree)) != NULL) {
1074 			/*
1075 			 * Active remote transactions are still present.
1076 			 * Simulate the other end sending us a DELETE.
1077 			 */
1078 			if (state->rxcmd & DMSGF_DELETE) {
1079 				dmsg_msg_free(msg);
1080 				fprintf(stderr,
1081 					"iocom: ioq error(rd) %d sleeping "
1082 					"state %p rxcmd %08x txcmd %08x "
1083 					"func %p\n",
1084 					ioq->error, state, state->rxcmd,
1085 					state->txcmd, state->func);
1086 				usleep(100000);	/* XXX */
1087 				atomic_set_int(&iocom->flags,
1088 					       DMSG_IOCOMF_RWORK);
1089 				msg = NULL;
1090 			} else {
1091 				/*state->txcmd |= DMSGF_DELETE;*/
1092 				msg->state = state;
1093 				msg->iocom = iocom;
1094 				msg->any.head.msgid = state->msgid;
1095 				msg->any.head.cmd |= DMSGF_ABORT |
1096 						     DMSGF_DELETE;
1097 			}
1098 		} else if ((state = RB_ROOT(&circuit0->statewr_tree)) != NULL) {
1099 			/*
1100 			 * Active local transactions are still present.
1101 			 * Simulate the other end sending us a DELETE.
1102 			 */
1103 			if (state->rxcmd & DMSGF_DELETE) {
1104 				dmsg_msg_free(msg);
1105 				fprintf(stderr,
1106 					"iocom: ioq error(wr) %d sleeping "
1107 					"state %p rxcmd %08x txcmd %08x "
1108 					"func %p\n",
1109 					ioq->error, state, state->rxcmd,
1110 					state->txcmd, state->func);
1111 				usleep(100000);	/* XXX */
1112 				atomic_set_int(&iocom->flags,
1113 					       DMSG_IOCOMF_RWORK);
1114 				msg = NULL;
1115 			} else {
1116 				msg->state = state;
1117 				msg->iocom = iocom;
1118 				msg->any.head.msgid = state->msgid;
1119 				msg->any.head.cmd |= DMSGF_ABORT |
1120 						     DMSGF_DELETE |
1121 						     DMSGF_REPLY;
1122 				if ((state->rxcmd & DMSGF_CREATE) == 0) {
1123 					msg->any.head.cmd |=
1124 						     DMSGF_CREATE;
1125 				}
1126 			}
1127 		} else {
1128 			/*
1129 			 * No active local or remote transactions remain.
1130 			 * Generate a final LNK_ERROR and flag EOF.
1131 			 */
1132 			msg->state = NULL;
1133 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
1134 			fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
1135 		}
1136 		pthread_mutex_unlock(&iocom->mtx);
1137 
1138 		/*
1139 		 * For the iocom error case we want to set RWORK to indicate
1140 		 * that more messages might be pending.
1141 		 *
1142 		 * It is possible to return NULL when there is more work to
1143 		 * do because each message has to be DELETEd in both
1144 		 * directions before we continue on with the next (though
1145 		 * this could be optimized).  The transmit direction will
1146 		 * re-set RWORK.
1147 		 */
1148 		if (msg)
1149 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1150 	} else if (msg == NULL) {
1151 		/*
1152 		 * Insufficient data received to finish building the message,
1153 		 * set RREQ and return NULL.
1154 		 *
1155 		 * Leave ioq->msg intact.
1156 		 * Leave the FIFO intact.
1157 		 */
1158 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1159 	} else {
1160 		/*
1161 		 * Continue processing msg.
1162 		 *
1163 		 * The fifo has already been advanced past the message.
1164 		 * Trivially reset the FIFO indices if possible.
1165 		 *
1166 		 * clear the FIFO if it is now empty and set RREQ to wait
1167 		 * for more from the socket.  If the FIFO is not empty set
1168 		 * TWORK to bypass the poll so we loop immediately.
1169 		 */
1170 		if (ioq->fifo_beg == ioq->fifo_cdx &&
1171 		    ioq->fifo_cdn == ioq->fifo_end) {
1172 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1173 			ioq->fifo_cdx = 0;
1174 			ioq->fifo_cdn = 0;
1175 			ioq->fifo_beg = 0;
1176 			ioq->fifo_end = 0;
1177 		} else {
1178 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1179 		}
1180 		ioq->state = DMSG_MSGQ_STATE_HEADER1;
1181 		ioq->msg = NULL;
1182 
1183 		/*
1184 		 * Handle message routing.  Validates non-zero sources
1185 		 * and routes message.  Error will be 0 if the message is
1186 		 * destined for us.
1187 		 *
1188 		 * State processing only occurs for messages destined for us.
1189 		 */
1190 		if (DMsgDebugOpt >= 5) {
1191 			fprintf(stderr,
1192 				"rxmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1193 				msg->any.head.cmd,
1194 				(intmax_t)msg->any.head.msgid,
1195 				(intmax_t)msg->any.head.circuit);
1196 		}
1197 		if (msg->any.head.circuit)
1198 			error = dmsg_circuit_route(msg);
1199 		else
1200 			error = dmsg_state_msgrx(msg);
1201 
1202 		if (error) {
1203 			/*
1204 			 * Abort-after-closure, throw message away and
1205 			 * start reading another.
1206 			 */
1207 			if (error == DMSG_IOQ_ERROR_EALREADY) {
1208 				dmsg_msg_free(msg);
1209 				goto again;
1210 			}
1211 
1212 			/*
1213 			 * msg routed, msg pointer no longer owned by us.
1214 			 * Go to the top and start reading another.
1215 			 */
1216 			if (error == DMSG_IOQ_ERROR_ROUTED)
1217 				goto again;
1218 
1219 			/*
1220 			 * Process real error and throw away message.
1221 			 */
1222 			ioq->error = error;
1223 			goto skip;
1224 		}
1225 		/* no error, not routed.  Fall through and return msg */
1226 	}
1227 	return (msg);
1228 }
1229 
1230 /*
1231  * Calculate the header and data crc's and write a low-level message to
1232  * the connection.  If aux_crc is non-zero the aux_data crc is already
1233  * assumed to have been set.
1234  *
1235  * A non-NULL msg is added to the queue but not necessarily flushed.
1236  * Calling this function with msg == NULL will get a flush going.
1237  *
1238  * (called from iocom_core only)
1239  */
1240 void
1241 dmsg_iocom_flush1(dmsg_iocom_t *iocom)
1242 {
1243 	dmsg_ioq_t *ioq = &iocom->ioq_tx;
1244 	dmsg_msg_t *msg;
1245 	uint32_t xcrc32;
1246 	size_t hbytes;
1247 	size_t abytes;
1248 	dmsg_msg_queue_t tmpq;
1249 
1250 	atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1251 	TAILQ_INIT(&tmpq);
1252 	pthread_mutex_lock(&iocom->mtx);
1253 	while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
1254 		TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
1255 		TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
1256 	}
1257 	pthread_mutex_unlock(&iocom->mtx);
1258 
1259 	while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
1260 		/*
1261 		 * Process terminal connection errors.
1262 		 */
1263 		TAILQ_REMOVE(&tmpq, msg, qentry);
1264 		if (ioq->error) {
1265 			TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1266 			++ioq->msgcount;
1267 			continue;
1268 		}
1269 
1270 		/*
1271 		 * Finish populating the msg fields.  The salt ensures that
1272 		 * the iv[] array is ridiculously randomized and we also
1273 		 * re-seed our PRNG every 32768 messages just to be sure.
1274 		 */
1275 		msg->any.head.magic = DMSG_HDR_MAGIC;
1276 		msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
1277 		++ioq->seq;
1278 		if ((ioq->seq & 32767) == 0)
1279 			srandomdev();
1280 
1281 		/*
1282 		 * Calculate aux_crc if 0, then calculate hdr_crc.
1283 		 */
1284 		if (msg->aux_size && msg->any.head.aux_crc == 0) {
1285 			abytes = DMSG_DOALIGN(msg->aux_size);
1286 			xcrc32 = dmsg_icrc32(msg->aux_data, abytes);
1287 			msg->any.head.aux_crc = xcrc32;
1288 		}
1289 		msg->any.head.aux_bytes = msg->aux_size;
1290 
1291 		hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1292 			 DMSG_ALIGN;
1293 		msg->any.head.hdr_crc = 0;
1294 		msg->any.head.hdr_crc = dmsg_icrc32(&msg->any.head, hbytes);
1295 
1296 		/*
1297 		 * Enqueue the message (the flush codes handles stream
1298 		 * encryption).
1299 		 */
1300 		TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1301 		++ioq->msgcount;
1302 	}
1303 	dmsg_iocom_flush2(iocom);
1304 }
1305 
1306 /*
1307  * Thread localized, iocom->mtx not held by caller.
1308  *
1309  * (called from iocom_core via iocom_flush1 only)
1310  */
1311 void
1312 dmsg_iocom_flush2(dmsg_iocom_t *iocom)
1313 {
1314 	dmsg_ioq_t *ioq = &iocom->ioq_tx;
1315 	dmsg_msg_t *msg;
1316 	ssize_t n;
1317 	struct iovec iov[DMSG_IOQ_MAXIOVEC];
1318 	size_t nact;
1319 	size_t hbytes;
1320 	size_t abytes;
1321 	size_t hoff;
1322 	size_t aoff;
1323 	int iovcnt;
1324 
1325 	if (ioq->error) {
1326 		dmsg_iocom_drain(iocom);
1327 		return;
1328 	}
1329 
1330 	/*
1331 	 * Pump messages out the connection by building an iovec.
1332 	 *
1333 	 * ioq->hbytes/ioq->abytes tracks how much of the first message
1334 	 * in the queue has been successfully written out, so we can
1335 	 * resume writing.
1336 	 */
1337 	iovcnt = 0;
1338 	nact = 0;
1339 	hoff = ioq->hbytes;
1340 	aoff = ioq->abytes;
1341 
1342 	TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
1343 		hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1344 			 DMSG_ALIGN;
1345 		abytes = DMSG_DOALIGN(msg->aux_size);
1346 		assert(hoff <= hbytes && aoff <= abytes);
1347 
1348 		if (hoff < hbytes) {
1349 			iov[iovcnt].iov_base = (char *)&msg->any.head + hoff;
1350 			iov[iovcnt].iov_len = hbytes - hoff;
1351 			nact += hbytes - hoff;
1352 			++iovcnt;
1353 			if (iovcnt == DMSG_IOQ_MAXIOVEC)
1354 				break;
1355 		}
1356 		if (aoff < abytes) {
1357 			assert(msg->aux_data != NULL);
1358 			iov[iovcnt].iov_base = (char *)msg->aux_data + aoff;
1359 			iov[iovcnt].iov_len = abytes - aoff;
1360 			nact += abytes - aoff;
1361 			++iovcnt;
1362 			if (iovcnt == DMSG_IOQ_MAXIOVEC)
1363 				break;
1364 		}
1365 		hoff = 0;
1366 		aoff = 0;
1367 	}
1368 	if (iovcnt == 0)
1369 		return;
1370 
1371 	/*
1372 	 * Encrypt and write the data.  The crypto code will move the
1373 	 * data into the fifo and adjust the iov as necessary.  If
1374 	 * encryption is disabled the iov is left alone.
1375 	 *
1376 	 * May return a smaller iov (thus a smaller n), with aggregated
1377 	 * chunks.  May reduce nmax to what fits in the FIFO.
1378 	 *
1379 	 * This function sets nact to the number of original bytes now
1380 	 * encrypted, adding to the FIFO some number of bytes that might
1381 	 * be greater depending on the crypto mechanic.  iov[] is adjusted
1382 	 * to point at the FIFO if necessary.
1383 	 *
1384 	 * NOTE: The return value from the writev() is the post-encrypted
1385 	 *	 byte count, not the plaintext count.
1386 	 */
1387 	if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
1388 		/*
1389 		 * Make sure the FIFO has a reasonable amount of space
1390 		 * left (if not completely full).
1391 		 *
1392 		 * In this situation we are staging the encrypted message
1393 		 * data in the FIFO.  (nact) represents how much plaintext
1394 		 * has been staged, (n) represents how much encrypted data
1395 		 * has been flushed.  The two are independent of each other.
1396 		 */
1397 		if (ioq->fifo_beg > sizeof(ioq->buf) / 2 &&
1398 		    sizeof(ioq->buf) - ioq->fifo_end < DMSG_ALIGN * 2) {
1399 			bcopy(ioq->buf + ioq->fifo_beg, ioq->buf,
1400 			      ioq->fifo_end - ioq->fifo_beg);
1401 			ioq->fifo_cdx -= ioq->fifo_beg;
1402 			ioq->fifo_cdn -= ioq->fifo_beg;
1403 			ioq->fifo_end -= ioq->fifo_beg;
1404 			ioq->fifo_beg = 0;
1405 		}
1406 
1407 		iovcnt = dmsg_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact);
1408 		n = writev(iocom->sock_fd, iov, iovcnt);
1409 		if (n > 0) {
1410 			ioq->fifo_beg += n;
1411 			ioq->fifo_cdn += n;
1412 			ioq->fifo_cdx += n;
1413 			if (ioq->fifo_beg == ioq->fifo_end) {
1414 				ioq->fifo_beg = 0;
1415 				ioq->fifo_cdn = 0;
1416 				ioq->fifo_cdx = 0;
1417 				ioq->fifo_end = 0;
1418 			}
1419 		}
1420 		/*
1421 		 * We don't mess with the nact returned by the crypto_encrypt
1422 		 * call, which represents the filling of the FIFO.  (n) tells
1423 		 * us how much we were able to write from the FIFO.  The two
1424 		 * are different beasts when encrypting.
1425 		 */
1426 	} else {
1427 		/*
1428 		 * In this situation we are not staging the messages to the
1429 		 * FIFO but instead writing them directly from the msg
1430 		 * structure(s), so (nact) is basically (n).
1431 		 */
1432 		n = writev(iocom->sock_fd, iov, iovcnt);
1433 		if (n > 0)
1434 			nact = n;
1435 		else
1436 			nact = 0;
1437 	}
1438 
1439 	/*
1440 	 * Clean out the transmit queue based on what we successfully
1441 	 * sent (nact is the plaintext count).  ioq->hbytes/abytes
1442 	 * represents the portion of the first message previously sent.
1443 	 */
1444 	while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1445 		hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1446 			 DMSG_ALIGN;
1447 		abytes = DMSG_DOALIGN(msg->aux_size);
1448 
1449 		if ((size_t)nact < hbytes - ioq->hbytes) {
1450 			ioq->hbytes += nact;
1451 			nact = 0;
1452 			break;
1453 		}
1454 		nact -= hbytes - ioq->hbytes;
1455 		ioq->hbytes = hbytes;
1456 		if ((size_t)nact < abytes - ioq->abytes) {
1457 			ioq->abytes += nact;
1458 			nact = 0;
1459 			break;
1460 		}
1461 		nact -= abytes - ioq->abytes;
1462 		/* ioq->abytes = abytes; optimized out */
1463 
1464 		if (DMsgDebugOpt >= 5) {
1465 			fprintf(stderr,
1466 				"txmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1467 				msg->any.head.cmd,
1468 				(intmax_t)msg->any.head.msgid,
1469 				(intmax_t)msg->any.head.circuit);
1470 		}
1471 
1472 		TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1473 		--ioq->msgcount;
1474 		ioq->hbytes = 0;
1475 		ioq->abytes = 0;
1476 
1477 		dmsg_state_cleanuptx(msg);
1478 	}
1479 	assert(nact == 0);
1480 
1481 	/*
1482 	 * Process the return value from the write w/regards to blocking.
1483 	 */
1484 	if (n < 0) {
1485 		if (errno != EINTR &&
1486 		    errno != EINPROGRESS &&
1487 		    errno != EAGAIN) {
1488 			/*
1489 			 * Fatal write error
1490 			 */
1491 			ioq->error = DMSG_IOQ_ERROR_SOCK;
1492 			dmsg_iocom_drain(iocom);
1493 		} else {
1494 			/*
1495 			 * Wait for socket buffer space
1496 			 */
1497 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
1498 		}
1499 	} else {
1500 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
1501 	}
1502 	if (ioq->error) {
1503 		dmsg_iocom_drain(iocom);
1504 	}
1505 }
1506 
1507 /*
1508  * Kill pending msgs on ioq_tx and adjust the flags such that no more
1509  * write events will occur.  We don't kill read msgs because we want
1510  * the caller to pull off our contrived terminal error msg to detect
1511  * the connection failure.
1512  *
1513  * Localized to iocom_core thread, iocom->mtx not held by caller.
1514  */
1515 void
1516 dmsg_iocom_drain(dmsg_iocom_t *iocom)
1517 {
1518 	dmsg_ioq_t *ioq = &iocom->ioq_tx;
1519 	dmsg_msg_t *msg;
1520 
1521 	atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1522 	ioq->hbytes = 0;
1523 	ioq->abytes = 0;
1524 
1525 	while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1526 		TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1527 		--ioq->msgcount;
1528 		dmsg_state_cleanuptx(msg);
1529 	}
1530 }
1531 
1532 /*
1533  * Write a message to an iocom, with additional state processing.
1534  */
1535 void
1536 dmsg_msg_write(dmsg_msg_t *msg)
1537 {
1538 	dmsg_iocom_t *iocom = msg->iocom;
1539 	dmsg_state_t *state;
1540 	char dummy;
1541 
1542 	/*
1543 	 * Handle state processing, create state if necessary.
1544 	 */
1545 	pthread_mutex_lock(&iocom->mtx);
1546 	if ((state = msg->state) != NULL) {
1547 		/*
1548 		 * Existing transaction (could be reply).  It is also
1549 		 * possible for this to be the first reply (CREATE is set),
1550 		 * in which case we populate state->txcmd.
1551 		 *
1552 		 * state->txcmd is adjusted to hold the final message cmd,
1553 		 * and we also be sure to set the CREATE bit here.  We did
1554 		 * not set it in dmsg_msg_alloc() because that would have
1555 		 * not been serialized (state could have gotten ripped out
1556 		 * from under the message prior to it being transmitted).
1557 		 */
1558 		if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
1559 		    DMSGF_CREATE) {
1560 			state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1561 			state->icmd = state->txcmd & DMSGF_BASECMDMASK;
1562 		}
1563 		msg->any.head.msgid = state->msgid;
1564 		assert(((state->txcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0);
1565 		if (msg->any.head.cmd & DMSGF_CREATE) {
1566 			state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1567 		}
1568 	}
1569 
1570 	/*
1571 	 * Queue it for output, wake up the I/O pthread.  Note that the
1572 	 * I/O thread is responsible for generating the CRCs and encryption.
1573 	 */
1574 	TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
1575 	dummy = 0;
1576 	write(iocom->wakeupfds[1], &dummy, 1);	/* XXX optimize me */
1577 	pthread_mutex_unlock(&iocom->mtx);
1578 }
1579 
1580 /*
1581  * This is a shortcut to formulate a reply to msg with a simple error code,
1582  * It can reply to and terminate a transaction, or it can reply to a one-way
1583  * messages.  A DMSG_LNK_ERROR command code is utilized to encode
1584  * the error code (which can be 0).  Not all transactions are terminated
1585  * with DMSG_LNK_ERROR status (the low level only cares about the
1586  * MSGF_DELETE flag), but most are.
1587  *
1588  * Replies to one-way messages are a bit of an oxymoron but the feature
1589  * is used by the debug (DBG) protocol.
1590  *
1591  * The reply contains no extended data.
1592  */
1593 void
1594 dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
1595 {
1596 	dmsg_state_t *state = msg->state;
1597 	dmsg_msg_t *nmsg;
1598 	uint32_t cmd;
1599 
1600 
1601 	/*
1602 	 * Reply with a simple error code and terminate the transaction.
1603 	 */
1604 	cmd = DMSG_LNK_ERROR;
1605 
1606 	/*
1607 	 * Check if our direction has even been initiated yet, set CREATE.
1608 	 *
1609 	 * Check what direction this is (command or reply direction).  Note
1610 	 * that txcmd might not have been initiated yet.
1611 	 *
1612 	 * If our direction has already been closed we just return without
1613 	 * doing anything.
1614 	 */
1615 	if (state) {
1616 		if (state->txcmd & DMSGF_DELETE)
1617 			return;
1618 		if (state->txcmd & DMSGF_REPLY)
1619 			cmd |= DMSGF_REPLY;
1620 		cmd |= DMSGF_DELETE;
1621 	} else {
1622 		if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1623 			cmd |= DMSGF_REPLY;
1624 	}
1625 
1626 	/*
1627 	 * Allocate the message and associate it with the existing state.
1628 	 * We cannot pass DMSGF_CREATE to msg_alloc() because that may
1629 	 * allocate new state.  We have our state already.
1630 	 */
1631 	nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL);
1632 	if (state) {
1633 		if ((state->txcmd & DMSGF_CREATE) == 0)
1634 			nmsg->any.head.cmd |= DMSGF_CREATE;
1635 	}
1636 	nmsg->any.head.error = error;
1637 	nmsg->any.head.msgid = msg->any.head.msgid;
1638 	nmsg->any.head.circuit = msg->any.head.circuit;
1639 	nmsg->state = state;
1640 	dmsg_msg_write(nmsg);
1641 }
1642 
1643 /*
1644  * Similar to dmsg_msg_reply() but leave the transaction open.  That is,
1645  * we are generating a streaming reply or an intermediate acknowledgement
1646  * of some sort as part of the higher level protocol, with more to come
1647  * later.
1648  */
1649 void
1650 dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
1651 {
1652 	dmsg_state_t *state = msg->state;
1653 	dmsg_msg_t *nmsg;
1654 	uint32_t cmd;
1655 
1656 
1657 	/*
1658 	 * Reply with a simple error code and terminate the transaction.
1659 	 */
1660 	cmd = DMSG_LNK_ERROR;
1661 
1662 	/*
1663 	 * Check if our direction has even been initiated yet, set CREATE.
1664 	 *
1665 	 * Check what direction this is (command or reply direction).  Note
1666 	 * that txcmd might not have been initiated yet.
1667 	 *
1668 	 * If our direction has already been closed we just return without
1669 	 * doing anything.
1670 	 */
1671 	if (state) {
1672 		if (state->txcmd & DMSGF_DELETE)
1673 			return;
1674 		if (state->txcmd & DMSGF_REPLY)
1675 			cmd |= DMSGF_REPLY;
1676 		/* continuing transaction, do not set MSGF_DELETE */
1677 	} else {
1678 		if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1679 			cmd |= DMSGF_REPLY;
1680 	}
1681 
1682 	nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL);
1683 	if (state) {
1684 		if ((state->txcmd & DMSGF_CREATE) == 0)
1685 			nmsg->any.head.cmd |= DMSGF_CREATE;
1686 	}
1687 	nmsg->any.head.error = error;
1688 	nmsg->any.head.msgid = msg->any.head.msgid;
1689 	nmsg->any.head.circuit = msg->any.head.circuit;
1690 	nmsg->state = state;
1691 	dmsg_msg_write(nmsg);
1692 }
1693 
1694 /*
1695  * Terminate a transaction given a state structure by issuing a DELETE.
1696  */
1697 void
1698 dmsg_state_reply(dmsg_state_t *state, uint32_t error)
1699 {
1700 	dmsg_msg_t *nmsg;
1701 	uint32_t cmd = DMSG_LNK_ERROR | DMSGF_DELETE;
1702 
1703 	/*
1704 	 * Nothing to do if we already transmitted a delete
1705 	 */
1706 	if (state->txcmd & DMSGF_DELETE)
1707 		return;
1708 
1709 	/*
1710 	 * Set REPLY if the other end initiated the command.  Otherwise
1711 	 * we are the command direction.
1712 	 */
1713 	if (state->txcmd & DMSGF_REPLY)
1714 		cmd |= DMSGF_REPLY;
1715 
1716 	nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL);
1717 	if (state) {
1718 		if ((state->txcmd & DMSGF_CREATE) == 0)
1719 			nmsg->any.head.cmd |= DMSGF_CREATE;
1720 	}
1721 	nmsg->any.head.error = error;
1722 	nmsg->any.head.msgid = state->msgid;
1723 	nmsg->any.head.circuit = state->msg->any.head.circuit;
1724 	nmsg->state = state;
1725 	dmsg_msg_write(nmsg);
1726 }
1727 
1728 /*
1729  * Terminate a transaction given a state structure by issuing a DELETE.
1730  */
1731 void
1732 dmsg_state_result(dmsg_state_t *state, uint32_t error)
1733 {
1734 	dmsg_msg_t *nmsg;
1735 	uint32_t cmd = DMSG_LNK_ERROR;
1736 
1737 	/*
1738 	 * Nothing to do if we already transmitted a delete
1739 	 */
1740 	if (state->txcmd & DMSGF_DELETE)
1741 		return;
1742 
1743 	/*
1744 	 * Set REPLY if the other end initiated the command.  Otherwise
1745 	 * we are the command direction.
1746 	 */
1747 	if (state->txcmd & DMSGF_REPLY)
1748 		cmd |= DMSGF_REPLY;
1749 
1750 	nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL);
1751 	if (state) {
1752 		if ((state->txcmd & DMSGF_CREATE) == 0)
1753 			nmsg->any.head.cmd |= DMSGF_CREATE;
1754 	}
1755 	nmsg->any.head.error = error;
1756 	nmsg->any.head.msgid = state->msgid;
1757 	nmsg->any.head.circuit = state->msg->any.head.circuit;
1758 	nmsg->state = state;
1759 	dmsg_msg_write(nmsg);
1760 }
1761 
1762 /************************************************************************
1763  *			TRANSACTION STATE HANDLING			*
1764  ************************************************************************
1765  *
1766  */
1767 
1768 /*
1769  * Process circuit and state tracking for a message after reception, prior
1770  * to execution.
1771  *
1772  * Called with msglk held and the msg dequeued.
1773  *
1774  * All messages are called with dummy state and return actual state.
1775  * (One-off messages often just return the same dummy state).
1776  *
1777  * May request that caller discard the message by setting *discardp to 1.
1778  * The returned state is not used in this case and is allowed to be NULL.
1779  *
1780  * --
1781  *
1782  * These routines handle persistent and command/reply message state via the
1783  * CREATE and DELETE flags.  The first message in a command or reply sequence
1784  * sets CREATE, the last message in a command or reply sequence sets DELETE.
1785  *
1786  * There can be any number of intermediate messages belonging to the same
1787  * sequence sent inbetween the CREATE message and the DELETE message,
1788  * which set neither flag.  This represents a streaming command or reply.
1789  *
1790  * Any command message received with CREATE set expects a reply sequence to
1791  * be returned.  Reply sequences work the same as command sequences except the
1792  * REPLY bit is also sent.  Both the command side and reply side can
1793  * degenerate into a single message with both CREATE and DELETE set.  Note
1794  * that one side can be streaming and the other side not, or neither, or both.
1795  *
1796  * The msgid is unique for the initiator.  That is, two sides sending a new
1797  * message can use the same msgid without colliding.
1798  *
1799  * --
1800  *
1801  * ABORT sequences work by setting the ABORT flag along with normal message
1802  * state.  However, ABORTs can also be sent on half-closed messages, that is
1803  * even if the command or reply side has already sent a DELETE, as long as
1804  * the message has not been fully closed it can still send an ABORT+DELETE
1805  * to terminate the half-closed message state.
1806  *
1807  * Since ABORT+DELETEs can race we silently discard ABORT's for message
1808  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
1809  * also race, and in this situation the other side might have already
1810  * initiated a new unrelated command with the same message id.  Since
1811  * the abort has not set the CREATE flag the situation can be detected
1812  * and the message will also be discarded.
1813  *
1814  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1815  * The ABORT request is essentially integrated into the command instead
1816  * of being sent later on.  In this situation the command implementation
1817  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1818  * special-case non-blocking operation for the command.
1819  *
1820  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
1821  *	  to be mid-stream aborts for command/reply sequences.  ABORTs on
1822  *	  one-way messages are not supported.
1823  *
1824  * NOTE!  If a command sequence does not support aborts the ABORT flag is
1825  *	  simply ignored.
1826  *
1827  * --
1828  *
1829  * One-off messages (no reply expected) are sent with neither CREATE or DELETE
1830  * set.  One-off messages cannot be aborted and typically aren't processed
1831  * by these routines.  The REPLY bit can be used to distinguish whether a
1832  * one-off message is a command or reply.  For example, one-off replies
1833  * will typically just contain status updates.
1834  */
1835 static int
1836 dmsg_state_msgrx(dmsg_msg_t *msg)
1837 {
1838 	dmsg_iocom_t *iocom = msg->iocom;
1839 	dmsg_circuit_t *circuit;
1840 	dmsg_circuit_t *ocircuit;
1841 	dmsg_state_t *state;
1842 	dmsg_state_t sdummy;
1843 	dmsg_circuit_t cdummy;
1844 	int error;
1845 
1846 	pthread_mutex_lock(&iocom->mtx);
1847 
1848 	/*
1849 	 * Locate existing persistent circuit and state, if any.
1850 	 */
1851 	if (msg->any.head.circuit == 0) {
1852 		circuit = &iocom->circuit0;
1853 	} else {
1854 		cdummy.msgid = msg->any.head.circuit;
1855 		circuit = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree,
1856 				  &cdummy);
1857 		if (circuit == NULL) {
1858 			pthread_mutex_unlock(&iocom->mtx);
1859 			return (DMSG_IOQ_ERROR_BAD_CIRCUIT);
1860 		}
1861 	}
1862 
1863 	/*
1864 	 * Replace circuit0 with actual
1865 	 */
1866 	dmsg_circuit_hold(circuit);
1867 	ocircuit = msg->circuit;
1868 	msg->circuit = circuit;
1869 
1870 	/*
1871 	 * If received msg is a command state is on staterd_tree.
1872 	 * If received msg is a reply state is on statewr_tree.
1873 	 */
1874 	sdummy.msgid = msg->any.head.msgid;
1875 	if (msg->any.head.cmd & DMSGF_REPLY) {
1876 		state = RB_FIND(dmsg_state_tree, &circuit->statewr_tree,
1877 				&sdummy);
1878 	} else {
1879 		state = RB_FIND(dmsg_state_tree, &circuit->staterd_tree,
1880 				&sdummy);
1881 	}
1882 	msg->state = state;
1883 
1884 	pthread_mutex_unlock(&iocom->mtx);
1885 	if (ocircuit)
1886 		dmsg_circuit_drop(ocircuit);
1887 
1888 	/*
1889 	 * Short-cut one-off or mid-stream messages (state may be NULL).
1890 	 */
1891 	if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
1892 				  DMSGF_ABORT)) == 0) {
1893 		return(0);
1894 	}
1895 
1896 	/*
1897 	 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1898 	 * inside the case statements.
1899 	 */
1900 	switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
1901 				    DMSGF_REPLY)) {
1902 	case DMSGF_CREATE:
1903 	case DMSGF_CREATE | DMSGF_DELETE:
1904 		/*
1905 		 * New persistant command received.
1906 		 */
1907 		if (state) {
1908 			fprintf(stderr, "duplicate-trans %s\n",
1909 				dmsg_msg_str(msg));
1910 			error = DMSG_IOQ_ERROR_TRANS;
1911 			assert(0);
1912 			break;
1913 		}
1914 		state = malloc(sizeof(*state));
1915 		bzero(state, sizeof(*state));
1916 		state->iocom = iocom;
1917 		state->circuit = circuit;
1918 		state->flags = DMSG_STATE_DYNAMIC;
1919 		state->msg = msg;
1920 		state->txcmd = DMSGF_REPLY;
1921 		state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1922 		state->icmd = state->rxcmd & DMSGF_BASECMDMASK;
1923 		state->flags |= DMSG_STATE_INSERTED;
1924 		state->msgid = msg->any.head.msgid;
1925 		msg->state = state;
1926 		pthread_mutex_lock(&iocom->mtx);
1927 		RB_INSERT(dmsg_state_tree, &circuit->staterd_tree, state);
1928 		pthread_mutex_unlock(&iocom->mtx);
1929 		error = 0;
1930 		if (DMsgDebugOpt) {
1931 			fprintf(stderr, "create state %p id=%08x on iocom staterd %p\n",
1932 				state, (uint32_t)state->msgid, iocom);
1933 		}
1934 		break;
1935 	case DMSGF_DELETE:
1936 		/*
1937 		 * Persistent state is expected but might not exist if an
1938 		 * ABORT+DELETE races the close.
1939 		 */
1940 		if (state == NULL) {
1941 			if (msg->any.head.cmd & DMSGF_ABORT) {
1942 				error = DMSG_IOQ_ERROR_EALREADY;
1943 			} else {
1944 				fprintf(stderr, "missing-state %s\n",
1945 					dmsg_msg_str(msg));
1946 				error = DMSG_IOQ_ERROR_TRANS;
1947 			assert(0);
1948 			}
1949 			break;
1950 		}
1951 
1952 		/*
1953 		 * Handle another ABORT+DELETE case if the msgid has already
1954 		 * been reused.
1955 		 */
1956 		if ((state->rxcmd & DMSGF_CREATE) == 0) {
1957 			if (msg->any.head.cmd & DMSGF_ABORT) {
1958 				error = DMSG_IOQ_ERROR_EALREADY;
1959 			} else {
1960 				fprintf(stderr, "reused-state %s\n",
1961 					dmsg_msg_str(msg));
1962 				error = DMSG_IOQ_ERROR_TRANS;
1963 			assert(0);
1964 			}
1965 			break;
1966 		}
1967 		error = 0;
1968 		break;
1969 	default:
1970 		/*
1971 		 * Check for mid-stream ABORT command received, otherwise
1972 		 * allow.
1973 		 */
1974 		if (msg->any.head.cmd & DMSGF_ABORT) {
1975 			if (state == NULL ||
1976 			    (state->rxcmd & DMSGF_CREATE) == 0) {
1977 				error = DMSG_IOQ_ERROR_EALREADY;
1978 				break;
1979 			}
1980 		}
1981 		error = 0;
1982 		break;
1983 	case DMSGF_REPLY | DMSGF_CREATE:
1984 	case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
1985 		/*
1986 		 * When receiving a reply with CREATE set the original
1987 		 * persistent state message should already exist.
1988 		 */
1989 		if (state == NULL) {
1990 			fprintf(stderr, "no-state(r) %s\n",
1991 				dmsg_msg_str(msg));
1992 			error = DMSG_IOQ_ERROR_TRANS;
1993 			assert(0);
1994 			break;
1995 		}
1996 		assert(((state->rxcmd ^ msg->any.head.cmd) &
1997 			DMSGF_REPLY) == 0);
1998 		state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1999 		error = 0;
2000 		break;
2001 	case DMSGF_REPLY | DMSGF_DELETE:
2002 		/*
2003 		 * Received REPLY+ABORT+DELETE in case where msgid has
2004 		 * already been fully closed, ignore the message.
2005 		 */
2006 		if (state == NULL) {
2007 			if (msg->any.head.cmd & DMSGF_ABORT) {
2008 				error = DMSG_IOQ_ERROR_EALREADY;
2009 			} else {
2010 				fprintf(stderr, "no-state(r,d) %s\n",
2011 					dmsg_msg_str(msg));
2012 				error = DMSG_IOQ_ERROR_TRANS;
2013 			assert(0);
2014 			}
2015 			break;
2016 		}
2017 
2018 		/*
2019 		 * Received REPLY+ABORT+DELETE in case where msgid has
2020 		 * already been reused for an unrelated message,
2021 		 * ignore the message.
2022 		 */
2023 		if ((state->rxcmd & DMSGF_CREATE) == 0) {
2024 			if (msg->any.head.cmd & DMSGF_ABORT) {
2025 				error = DMSG_IOQ_ERROR_EALREADY;
2026 			} else {
2027 				fprintf(stderr, "reused-state(r,d) %s\n",
2028 					dmsg_msg_str(msg));
2029 				error = DMSG_IOQ_ERROR_TRANS;
2030 			assert(0);
2031 			}
2032 			break;
2033 		}
2034 		error = 0;
2035 		break;
2036 	case DMSGF_REPLY:
2037 		/*
2038 		 * Check for mid-stream ABORT reply received to sent command.
2039 		 */
2040 		if (msg->any.head.cmd & DMSGF_ABORT) {
2041 			if (state == NULL ||
2042 			    (state->rxcmd & DMSGF_CREATE) == 0) {
2043 				error = DMSG_IOQ_ERROR_EALREADY;
2044 				break;
2045 			}
2046 		}
2047 		error = 0;
2048 		break;
2049 	}
2050 	return (error);
2051 }
2052 
2053 void
2054 dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
2055 {
2056 	dmsg_state_t *state;
2057 
2058 	if ((state = msg->state) == NULL) {
2059 		/*
2060 		 * Free a non-transactional message, there is no state
2061 		 * to worry about.
2062 		 */
2063 		dmsg_msg_free(msg);
2064 	} else if (msg->any.head.cmd & DMSGF_DELETE) {
2065 		/*
2066 		 * Message terminating transaction, destroy the related
2067 		 * state, the original message, and this message (if it
2068 		 * isn't the original message due to a CREATE|DELETE).
2069 		 */
2070 		pthread_mutex_lock(&iocom->mtx);
2071 		state->rxcmd |= DMSGF_DELETE;
2072 		if (state->txcmd & DMSGF_DELETE) {
2073 			if (state->msg == msg)
2074 				state->msg = NULL;
2075 			assert(state->flags & DMSG_STATE_INSERTED);
2076 			if (state->rxcmd & DMSGF_REPLY) {
2077 				assert(msg->any.head.cmd & DMSGF_REPLY);
2078 				RB_REMOVE(dmsg_state_tree,
2079 					  &msg->circuit->statewr_tree, state);
2080 			} else {
2081 				assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2082 				RB_REMOVE(dmsg_state_tree,
2083 					  &msg->circuit->staterd_tree, state);
2084 			}
2085 			state->flags &= ~DMSG_STATE_INSERTED;
2086 			dmsg_state_free(state);
2087 		} else {
2088 			;
2089 		}
2090 		pthread_mutex_unlock(&iocom->mtx);
2091 		dmsg_msg_free(msg);
2092 	} else if (state->msg != msg) {
2093 		/*
2094 		 * Message not terminating transaction, leave state intact
2095 		 * and free message if it isn't the CREATE message.
2096 		 */
2097 		dmsg_msg_free(msg);
2098 	}
2099 }
2100 
2101 static void
2102 dmsg_state_cleanuptx(dmsg_msg_t *msg)
2103 {
2104 	dmsg_iocom_t *iocom = msg->iocom;
2105 	dmsg_state_t *state;
2106 
2107 	if ((state = msg->state) == NULL) {
2108 		dmsg_msg_free(msg);
2109 	} else if (msg->any.head.cmd & DMSGF_DELETE) {
2110 		pthread_mutex_lock(&iocom->mtx);
2111 		assert((state->txcmd & DMSGF_DELETE) == 0);
2112 		state->txcmd |= DMSGF_DELETE;
2113 		if (state->rxcmd & DMSGF_DELETE) {
2114 			if (state->msg == msg)
2115 				state->msg = NULL;
2116 			assert(state->flags & DMSG_STATE_INSERTED);
2117 			if (state->txcmd & DMSGF_REPLY) {
2118 				assert(msg->any.head.cmd & DMSGF_REPLY);
2119 				RB_REMOVE(dmsg_state_tree,
2120 					  &msg->circuit->staterd_tree, state);
2121 			} else {
2122 				assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2123 				RB_REMOVE(dmsg_state_tree,
2124 					  &msg->circuit->statewr_tree, state);
2125 			}
2126 			state->flags &= ~DMSG_STATE_INSERTED;
2127 			dmsg_state_free(state);
2128 		} else {
2129 			;
2130 		}
2131 		pthread_mutex_unlock(&iocom->mtx);
2132 		dmsg_msg_free(msg);
2133 	} else if (state->msg != msg) {
2134 		dmsg_msg_free(msg);
2135 	}
2136 }
2137 
2138 /*
2139  * Called with iocom locked
2140  */
2141 void
2142 dmsg_state_free(dmsg_state_t *state)
2143 {
2144 	dmsg_msg_t *msg;
2145 
2146 	if (DMsgDebugOpt) {
2147 		fprintf(stderr, "terminate state %p id=%08x\n",
2148 			state, (uint32_t)state->msgid);
2149 	}
2150 	if (state->any.any != NULL)   /* XXX avoid deadlock w/exit & kernel */
2151 		closefrom(3);
2152 	assert(state->any.any == NULL);
2153 	msg = state->msg;
2154 	state->msg = NULL;
2155 	if (msg)
2156 		dmsg_msg_free_locked(msg);
2157 	free(state);
2158 }
2159 
2160 /*
2161  * Called with iocom locked
2162  */
2163 void
2164 dmsg_circuit_hold(dmsg_circuit_t *circuit)
2165 {
2166 	assert(circuit->refs > 0);		/* caller must hold ref */
2167 	atomic_add_int(&circuit->refs, 1);	/* to safely add more */
2168 }
2169 
2170 /*
2171  * Called with iocom locked
2172  */
2173 void
2174 dmsg_circuit_drop(dmsg_circuit_t *circuit)
2175 {
2176 	dmsg_iocom_t *iocom = circuit->iocom;
2177 	char dummy;
2178 
2179 	assert(circuit->refs > 0);
2180 	assert(iocom);
2181 
2182 	/*
2183 	 * Decrement circuit refs, destroy circuit when refs drops to 0.
2184 	 */
2185 	if (atomic_fetchadd_int(&circuit->refs, -1) != 1)
2186 		return;
2187 	assert(circuit != &iocom->circuit0);
2188 
2189 	assert(RB_EMPTY(&circuit->staterd_tree));
2190 	assert(RB_EMPTY(&circuit->statewr_tree));
2191 	pthread_mutex_lock(&iocom->mtx);
2192 	RB_REMOVE(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
2193 	circuit->iocom = NULL;
2194 	pthread_mutex_unlock(&iocom->mtx);
2195 	dmsg_free(circuit);
2196 
2197 	/*
2198 	 * When an iocom error is present the rx code will terminate the
2199 	 * receive side for all transactions and (indirectly) all circuits
2200 	 * by simulating DELETE messages.  The state and related circuits
2201 	 * don't disappear until the related states are closed in both
2202 	 * directions
2203 	 *
2204 	 * Detect the case where the last circuit is now gone (and thus all
2205 	 * states for all circuits are gone), and wakeup the rx thread to
2206 	 * complete the termination.
2207 	 */
2208 	if (iocom->ioq_rx.error && RB_EMPTY(&iocom->circuit_tree)) {
2209 		dummy = 0;
2210 		write(iocom->wakeupfds[1], &dummy, 1);
2211 	}
2212 }
2213 
2214 void
2215 dmsg_circuit_drop_locked(dmsg_circuit_t *circuit)
2216 {
2217 	dmsg_iocom_t *iocom;
2218 
2219 	iocom = circuit->iocom;
2220 	assert(circuit->refs > 0);
2221 	assert(iocom);
2222 
2223 	if (atomic_fetchadd_int(&circuit->refs, -1) == 1) {
2224 		assert(circuit != &iocom->circuit0);
2225 		assert(RB_EMPTY(&circuit->staterd_tree));
2226 		assert(RB_EMPTY(&circuit->statewr_tree));
2227 		RB_REMOVE(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
2228 		circuit->iocom = NULL;
2229 		dmsg_free(circuit);
2230 		if (iocom->ioq_rx.error && RB_EMPTY(&iocom->circuit_tree)) {
2231 			char dummy = 0;
2232 			write(iocom->wakeupfds[1], &dummy, 1);
2233 		}
2234 	}
2235 }
2236 
2237 /*
2238  * This swaps endian for a hammer2_msg_hdr.  Note that the extended
2239  * header is not adjusted, just the core header.
2240  */
2241 void
2242 dmsg_bswap_head(dmsg_hdr_t *head)
2243 {
2244 	head->magic	= bswap16(head->magic);
2245 	head->reserved02 = bswap16(head->reserved02);
2246 	head->salt	= bswap32(head->salt);
2247 
2248 	head->msgid	= bswap64(head->msgid);
2249 	head->circuit	= bswap64(head->circuit);
2250 	head->reserved18= bswap64(head->reserved18);
2251 
2252 	head->cmd	= bswap32(head->cmd);
2253 	head->aux_crc	= bswap32(head->aux_crc);
2254 	head->aux_bytes	= bswap32(head->aux_bytes);
2255 	head->error	= bswap32(head->error);
2256 	head->aux_descr = bswap64(head->aux_descr);
2257 	head->reserved38= bswap32(head->reserved38);
2258 	head->hdr_crc	= bswap32(head->hdr_crc);
2259 }
2260