xref: /dragonfly/lib/libdmsg/msg.c (revision 024de405)
10c3a8cd0SMatthew Dillon /*
20c3a8cd0SMatthew Dillon  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
30c3a8cd0SMatthew Dillon  *
40c3a8cd0SMatthew Dillon  * This code is derived from software contributed to The DragonFly Project
50c3a8cd0SMatthew Dillon  * by Matthew Dillon <dillon@dragonflybsd.org>
60c3a8cd0SMatthew Dillon  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
70c3a8cd0SMatthew Dillon  *
80c3a8cd0SMatthew Dillon  * Redistribution and use in source and binary forms, with or without
90c3a8cd0SMatthew Dillon  * modification, are permitted provided that the following conditions
100c3a8cd0SMatthew Dillon  * are met:
110c3a8cd0SMatthew Dillon  *
120c3a8cd0SMatthew Dillon  * 1. Redistributions of source code must retain the above copyright
130c3a8cd0SMatthew Dillon  *    notice, this list of conditions and the following disclaimer.
140c3a8cd0SMatthew Dillon  * 2. Redistributions in binary form must reproduce the above copyright
150c3a8cd0SMatthew Dillon  *    notice, this list of conditions and the following disclaimer in
160c3a8cd0SMatthew Dillon  *    the documentation and/or other materials provided with the
170c3a8cd0SMatthew Dillon  *    distribution.
180c3a8cd0SMatthew Dillon  * 3. Neither the name of The DragonFly Project nor the names of its
190c3a8cd0SMatthew Dillon  *    contributors may be used to endorse or promote products derived
200c3a8cd0SMatthew Dillon  *    from this software without specific, prior written permission.
210c3a8cd0SMatthew Dillon  *
220c3a8cd0SMatthew Dillon  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
230c3a8cd0SMatthew Dillon  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
240c3a8cd0SMatthew Dillon  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
250c3a8cd0SMatthew Dillon  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
260c3a8cd0SMatthew Dillon  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
270c3a8cd0SMatthew Dillon  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
280c3a8cd0SMatthew Dillon  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
290c3a8cd0SMatthew Dillon  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
300c3a8cd0SMatthew Dillon  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
310c3a8cd0SMatthew Dillon  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
320c3a8cd0SMatthew Dillon  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
330c3a8cd0SMatthew Dillon  * SUCH DAMAGE.
340c3a8cd0SMatthew Dillon  */
350c3a8cd0SMatthew Dillon 
360c3a8cd0SMatthew Dillon #include "dmsg_local.h"
370c3a8cd0SMatthew Dillon 
380c3a8cd0SMatthew Dillon int DMsgDebugOpt;
39323c0947SMatthew Dillon int dmsg_state_count;
400c3a8cd0SMatthew Dillon 
410c3a8cd0SMatthew Dillon static int dmsg_state_msgrx(dmsg_msg_t *msg);
421b8eded1SMatthew Dillon static void dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg);
43a2179323SMatthew Dillon static void dmsg_msg_free_locked(dmsg_msg_t *msg);
44323c0947SMatthew Dillon static void dmsg_state_free(dmsg_state_t *state);
45323c0947SMatthew Dillon static void dmsg_msg_simulate_failure(dmsg_state_t *state, int error);
460c3a8cd0SMatthew Dillon 
470d20ec8aSMatthew Dillon RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
480c3a8cd0SMatthew Dillon 
490c3a8cd0SMatthew Dillon /*
500c3a8cd0SMatthew Dillon  * STATE TREE - Represents open transactions which are indexed by their
510d20ec8aSMatthew Dillon  *		{ msgid } relative to the governing iocom.
520c3a8cd0SMatthew Dillon  */
530c3a8cd0SMatthew Dillon int
540c3a8cd0SMatthew Dillon dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2)
550c3a8cd0SMatthew Dillon {
560c3a8cd0SMatthew Dillon 	if (state1->msgid < state2->msgid)
570c3a8cd0SMatthew Dillon 		return(-1);
580c3a8cd0SMatthew Dillon 	if (state1->msgid > state2->msgid)
590c3a8cd0SMatthew Dillon 		return(1);
600c3a8cd0SMatthew Dillon 	return(0);
610c3a8cd0SMatthew Dillon }
620c3a8cd0SMatthew Dillon 
630d20ec8aSMatthew Dillon /*
640c3a8cd0SMatthew Dillon  * Initialize a low-level ioq
650c3a8cd0SMatthew Dillon  */
660c3a8cd0SMatthew Dillon void
670c3a8cd0SMatthew Dillon dmsg_ioq_init(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
680c3a8cd0SMatthew Dillon {
690c3a8cd0SMatthew Dillon 	bzero(ioq, sizeof(*ioq));
700c3a8cd0SMatthew Dillon 	ioq->state = DMSG_MSGQ_STATE_HEADER1;
710c3a8cd0SMatthew Dillon 	TAILQ_INIT(&ioq->msgq);
720c3a8cd0SMatthew Dillon }
730c3a8cd0SMatthew Dillon 
740c3a8cd0SMatthew Dillon /*
750c3a8cd0SMatthew Dillon  * Cleanup queue.
760c3a8cd0SMatthew Dillon  *
770c3a8cd0SMatthew Dillon  * caller holds iocom->mtx.
780c3a8cd0SMatthew Dillon  */
790c3a8cd0SMatthew Dillon void
800c3a8cd0SMatthew Dillon dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
810c3a8cd0SMatthew Dillon {
820c3a8cd0SMatthew Dillon 	dmsg_msg_t *msg;
830c3a8cd0SMatthew Dillon 
840c3a8cd0SMatthew Dillon 	while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
850c3a8cd0SMatthew Dillon 		assert(0);	/* shouldn't happen */
860c3a8cd0SMatthew Dillon 		TAILQ_REMOVE(&ioq->msgq, msg, qentry);
870c3a8cd0SMatthew Dillon 		dmsg_msg_free(msg);
880c3a8cd0SMatthew Dillon 	}
890c3a8cd0SMatthew Dillon 	if ((msg = ioq->msg) != NULL) {
900c3a8cd0SMatthew Dillon 		ioq->msg = NULL;
910c3a8cd0SMatthew Dillon 		dmsg_msg_free(msg);
920c3a8cd0SMatthew Dillon 	}
930c3a8cd0SMatthew Dillon }
940c3a8cd0SMatthew Dillon 
950c3a8cd0SMatthew Dillon /*
960c3a8cd0SMatthew Dillon  * Initialize a low-level communications channel.
970c3a8cd0SMatthew Dillon  *
980c3a8cd0SMatthew Dillon  * NOTE: The signal_func() is called at least once from the loop and can be
990c3a8cd0SMatthew Dillon  *	 re-armed via dmsg_iocom_restate().
1000c3a8cd0SMatthew Dillon  */
1010c3a8cd0SMatthew Dillon void
1020c3a8cd0SMatthew Dillon dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
10301e43224SMatthew Dillon 		   void (*signal_func)(dmsg_iocom_t *iocom),
10401e43224SMatthew Dillon 		   void (*rcvmsg_func)(dmsg_msg_t *msg),
10501e43224SMatthew Dillon 		   void (*usrmsg_func)(dmsg_msg_t *msg, int unmanaged),
10601e43224SMatthew Dillon 		   void (*altmsg_func)(dmsg_iocom_t *iocom))
1070c3a8cd0SMatthew Dillon {
1080c3a8cd0SMatthew Dillon 	struct stat st;
1090c3a8cd0SMatthew Dillon 
1100c3a8cd0SMatthew Dillon 	bzero(iocom, sizeof(*iocom));
1110c3a8cd0SMatthew Dillon 
112f306de83SMatthew Dillon 	asprintf(&iocom->label, "iocom-%p", iocom);
1130d20ec8aSMatthew Dillon 	iocom->signal_callback = signal_func;
1140d20ec8aSMatthew Dillon 	iocom->rcvmsg_callback = rcvmsg_func;
1150d20ec8aSMatthew Dillon 	iocom->altmsg_callback = altmsg_func;
11601e43224SMatthew Dillon 	iocom->usrmsg_callback = usrmsg_func;
1170c3a8cd0SMatthew Dillon 
1180c3a8cd0SMatthew Dillon 	pthread_mutex_init(&iocom->mtx, NULL);
1191b8eded1SMatthew Dillon 	RB_INIT(&iocom->staterd_tree);
1201b8eded1SMatthew Dillon 	RB_INIT(&iocom->statewr_tree);
1210c3a8cd0SMatthew Dillon 	TAILQ_INIT(&iocom->freeq);
1220c3a8cd0SMatthew Dillon 	TAILQ_INIT(&iocom->freeq_aux);
1230d20ec8aSMatthew Dillon 	TAILQ_INIT(&iocom->txmsgq);
1240c3a8cd0SMatthew Dillon 	iocom->sock_fd = sock_fd;
1250c3a8cd0SMatthew Dillon 	iocom->alt_fd = alt_fd;
12698126869SMatthew Dillon 	iocom->flags = DMSG_IOCOMF_RREQ | DMSG_IOCOMF_CLOSEALT;
1270c3a8cd0SMatthew Dillon 	if (signal_func)
1280c3a8cd0SMatthew Dillon 		iocom->flags |= DMSG_IOCOMF_SWORK;
1290c3a8cd0SMatthew Dillon 	dmsg_ioq_init(iocom, &iocom->ioq_rx);
1300c3a8cd0SMatthew Dillon 	dmsg_ioq_init(iocom, &iocom->ioq_tx);
131323c0947SMatthew Dillon 	iocom->state0.refs = 1;		/* should never trigger a free */
1321b8eded1SMatthew Dillon 	iocom->state0.iocom = iocom;
1331b8eded1SMatthew Dillon 	iocom->state0.parent = &iocom->state0;
134d30cab67SMatthew Dillon 	iocom->state0.flags = DMSG_STATE_ROOT;
1351b8eded1SMatthew Dillon 	TAILQ_INIT(&iocom->state0.subq);
1361b8eded1SMatthew Dillon 
1370c3a8cd0SMatthew Dillon 	if (pipe(iocom->wakeupfds) < 0)
1380c3a8cd0SMatthew Dillon 		assert(0);
1390c3a8cd0SMatthew Dillon 	fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
1400c3a8cd0SMatthew Dillon 	fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
1410c3a8cd0SMatthew Dillon 
1420c3a8cd0SMatthew Dillon 	/*
1430c3a8cd0SMatthew Dillon 	 * Negotiate session crypto synchronously.  This will mark the
1440c3a8cd0SMatthew Dillon 	 * connection as error'd if it fails.  If this is a pipe it's
1450c3a8cd0SMatthew Dillon 	 * a linkage that we set up ourselves to the filesystem and there
1460c3a8cd0SMatthew Dillon 	 * is no crypto.
1470c3a8cd0SMatthew Dillon 	 */
1480c3a8cd0SMatthew Dillon 	if (fstat(sock_fd, &st) < 0)
1490c3a8cd0SMatthew Dillon 		assert(0);
1500c3a8cd0SMatthew Dillon 	if (S_ISSOCK(st.st_mode))
1510c3a8cd0SMatthew Dillon 		dmsg_crypto_negotiate(iocom);
1520c3a8cd0SMatthew Dillon 
1530c3a8cd0SMatthew Dillon 	/*
1540c3a8cd0SMatthew Dillon 	 * Make sure our fds are set to non-blocking for the iocom core.
1550c3a8cd0SMatthew Dillon 	 */
1560c3a8cd0SMatthew Dillon 	if (sock_fd >= 0)
1570c3a8cd0SMatthew Dillon 		fcntl(sock_fd, F_SETFL, O_NONBLOCK);
1580c3a8cd0SMatthew Dillon #if 0
1590c3a8cd0SMatthew Dillon 	/* if line buffered our single fgets() should be fine */
1600c3a8cd0SMatthew Dillon 	if (alt_fd >= 0)
1610c3a8cd0SMatthew Dillon 		fcntl(alt_fd, F_SETFL, O_NONBLOCK);
1620c3a8cd0SMatthew Dillon #endif
1630c3a8cd0SMatthew Dillon }
1640c3a8cd0SMatthew Dillon 
165f306de83SMatthew Dillon void
166f306de83SMatthew Dillon dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...)
167f306de83SMatthew Dillon {
168f306de83SMatthew Dillon 	va_list va;
169f306de83SMatthew Dillon 	char *optr;
170f306de83SMatthew Dillon 
171f306de83SMatthew Dillon 	va_start(va, ctl);
172f306de83SMatthew Dillon 	optr = iocom->label;
173f306de83SMatthew Dillon 	vasprintf(&iocom->label, ctl, va);
174f306de83SMatthew Dillon 	va_end(va);
175f306de83SMatthew Dillon 	if (optr)
176f306de83SMatthew Dillon 		free(optr);
177f306de83SMatthew Dillon }
178f306de83SMatthew Dillon 
1790c3a8cd0SMatthew Dillon /*
1800c3a8cd0SMatthew Dillon  * May only be called from a callback from iocom_core.
1810c3a8cd0SMatthew Dillon  *
1820c3a8cd0SMatthew Dillon  * Adjust state machine functions, set flags to guarantee that both
1830c3a8cd0SMatthew Dillon  * the recevmsg_func and the sendmsg_func is called at least once.
1840c3a8cd0SMatthew Dillon  */
1850c3a8cd0SMatthew Dillon void
1860d20ec8aSMatthew Dillon dmsg_iocom_restate(dmsg_iocom_t *iocom,
1870d20ec8aSMatthew Dillon 		   void (*signal_func)(dmsg_iocom_t *),
18801e43224SMatthew Dillon 		   void (*rcvmsg_func)(dmsg_msg_t *msg))
1890c3a8cd0SMatthew Dillon {
190a2179323SMatthew Dillon 	pthread_mutex_lock(&iocom->mtx);
1910d20ec8aSMatthew Dillon 	iocom->signal_callback = signal_func;
1920d20ec8aSMatthew Dillon 	iocom->rcvmsg_callback = rcvmsg_func;
1930c3a8cd0SMatthew Dillon 	if (signal_func)
194a2179323SMatthew Dillon 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
1950c3a8cd0SMatthew Dillon 	else
196a2179323SMatthew Dillon 		atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
197a2179323SMatthew Dillon 	pthread_mutex_unlock(&iocom->mtx);
1980c3a8cd0SMatthew Dillon }
1990c3a8cd0SMatthew Dillon 
2000c3a8cd0SMatthew Dillon void
2010d20ec8aSMatthew Dillon dmsg_iocom_signal(dmsg_iocom_t *iocom)
2020c3a8cd0SMatthew Dillon {
203a2179323SMatthew Dillon 	pthread_mutex_lock(&iocom->mtx);
2040d20ec8aSMatthew Dillon 	if (iocom->signal_callback)
205a2179323SMatthew Dillon 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
206a2179323SMatthew Dillon 	pthread_mutex_unlock(&iocom->mtx);
2070c3a8cd0SMatthew Dillon }
2080c3a8cd0SMatthew Dillon 
2090c3a8cd0SMatthew Dillon /*
2100c3a8cd0SMatthew Dillon  * Cleanup a terminating iocom.
2110c3a8cd0SMatthew Dillon  *
2120c3a8cd0SMatthew Dillon  * Caller should not hold iocom->mtx.  The iocom has already been disconnected
2130c3a8cd0SMatthew Dillon  * from all possible references to it.
2140c3a8cd0SMatthew Dillon  */
2150c3a8cd0SMatthew Dillon void
2160c3a8cd0SMatthew Dillon dmsg_iocom_done(dmsg_iocom_t *iocom)
2170c3a8cd0SMatthew Dillon {
2180c3a8cd0SMatthew Dillon 	dmsg_msg_t *msg;
2190c3a8cd0SMatthew Dillon 
2200c3a8cd0SMatthew Dillon 	if (iocom->sock_fd >= 0) {
2210c3a8cd0SMatthew Dillon 		close(iocom->sock_fd);
2220c3a8cd0SMatthew Dillon 		iocom->sock_fd = -1;
2230c3a8cd0SMatthew Dillon 	}
22498126869SMatthew Dillon 	if (iocom->alt_fd >= 0 && (iocom->flags & DMSG_IOCOMF_CLOSEALT)) {
2250c3a8cd0SMatthew Dillon 		close(iocom->alt_fd);
2260c3a8cd0SMatthew Dillon 		iocom->alt_fd = -1;
2270c3a8cd0SMatthew Dillon 	}
2280c3a8cd0SMatthew Dillon 	dmsg_ioq_done(iocom, &iocom->ioq_rx);
2290c3a8cd0SMatthew Dillon 	dmsg_ioq_done(iocom, &iocom->ioq_tx);
230a2179323SMatthew Dillon 	while ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
2310c3a8cd0SMatthew Dillon 		TAILQ_REMOVE(&iocom->freeq, msg, qentry);
2320c3a8cd0SMatthew Dillon 		free(msg);
2330c3a8cd0SMatthew Dillon 	}
234a2179323SMatthew Dillon 	while ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
2350c3a8cd0SMatthew Dillon 		TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
2360c3a8cd0SMatthew Dillon 		free(msg->aux_data);
2370c3a8cd0SMatthew Dillon 		msg->aux_data = NULL;
2380c3a8cd0SMatthew Dillon 		free(msg);
2390c3a8cd0SMatthew Dillon 	}
2400c3a8cd0SMatthew Dillon 	if (iocom->wakeupfds[0] >= 0) {
2410c3a8cd0SMatthew Dillon 		close(iocom->wakeupfds[0]);
2420c3a8cd0SMatthew Dillon 		iocom->wakeupfds[0] = -1;
2430c3a8cd0SMatthew Dillon 	}
2440c3a8cd0SMatthew Dillon 	if (iocom->wakeupfds[1] >= 0) {
2450c3a8cd0SMatthew Dillon 		close(iocom->wakeupfds[1]);
2460c3a8cd0SMatthew Dillon 		iocom->wakeupfds[1] = -1;
2470c3a8cd0SMatthew Dillon 	}
2480c3a8cd0SMatthew Dillon 	pthread_mutex_destroy(&iocom->mtx);
2490c3a8cd0SMatthew Dillon }
2500c3a8cd0SMatthew Dillon 
2510c3a8cd0SMatthew Dillon /*
2521b8eded1SMatthew Dillon  * Allocate a new message using the specified transaction state.
253a2179323SMatthew Dillon  *
2541b8eded1SMatthew Dillon  * If CREATE is set a new transaction is allocated relative to the passed-in
255d30cab67SMatthew Dillon  * transaction (the 'state' argument becomes pstate).
2561b8eded1SMatthew Dillon  *
2571b8eded1SMatthew Dillon  * If CREATE is not set the message is associated with the passed-in
2581b8eded1SMatthew Dillon  * transaction.
2590c3a8cd0SMatthew Dillon  */
2600c3a8cd0SMatthew Dillon dmsg_msg_t *
2611b8eded1SMatthew Dillon dmsg_msg_alloc(dmsg_state_t *state,
2620d20ec8aSMatthew Dillon 	       size_t aux_size, uint32_t cmd,
2630c3a8cd0SMatthew Dillon 	       void (*func)(dmsg_msg_t *), void *data)
2640c3a8cd0SMatthew Dillon {
2651b8eded1SMatthew Dillon 	dmsg_iocom_t *iocom = state->iocom;
266323c0947SMatthew Dillon 	dmsg_msg_t *msg;
267323c0947SMatthew Dillon 
268323c0947SMatthew Dillon 	pthread_mutex_lock(&iocom->mtx);
269323c0947SMatthew Dillon 	msg = dmsg_msg_alloc_locked(state, aux_size, cmd, func, data);
270323c0947SMatthew Dillon 	pthread_mutex_unlock(&iocom->mtx);
271323c0947SMatthew Dillon 
272323c0947SMatthew Dillon 	return msg;
273323c0947SMatthew Dillon }
274323c0947SMatthew Dillon 
275323c0947SMatthew Dillon dmsg_msg_t *
276323c0947SMatthew Dillon dmsg_msg_alloc_locked(dmsg_state_t *state,
277323c0947SMatthew Dillon 	       size_t aux_size, uint32_t cmd,
278323c0947SMatthew Dillon 	       void (*func)(dmsg_msg_t *), void *data)
279323c0947SMatthew Dillon {
280323c0947SMatthew Dillon 	dmsg_iocom_t *iocom = state->iocom;
2811b8eded1SMatthew Dillon 	dmsg_state_t *pstate;
2820c3a8cd0SMatthew Dillon 	dmsg_msg_t *msg;
2830c3a8cd0SMatthew Dillon 	int hbytes;
284f306de83SMatthew Dillon 	size_t aligned_size;
2850c3a8cd0SMatthew Dillon 
286a2179323SMatthew Dillon #if 0
2870c3a8cd0SMatthew Dillon 	if (aux_size) {
288f306de83SMatthew Dillon 		aligned_size = DMSG_DOALIGN(aux_size);
2890c3a8cd0SMatthew Dillon 		if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
2900c3a8cd0SMatthew Dillon 			TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
2910c3a8cd0SMatthew Dillon 	} else {
292f306de83SMatthew Dillon 		aligned_size = 0;
2930c3a8cd0SMatthew Dillon 		if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
2940c3a8cd0SMatthew Dillon 			TAILQ_REMOVE(&iocom->freeq, msg, qentry);
2950c3a8cd0SMatthew Dillon 	}
296a2179323SMatthew Dillon #endif
297a2179323SMatthew Dillon 	aligned_size = DMSG_DOALIGN(aux_size);
298a2179323SMatthew Dillon 	msg = NULL;
2990c3a8cd0SMatthew Dillon 	if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
3000c3a8cd0SMatthew Dillon 		/*
3011b8eded1SMatthew Dillon 		 * When CREATE is set without REPLY the caller is
3021b8eded1SMatthew Dillon 		 * initiating a new transaction stacked under the specified
3031b8eded1SMatthew Dillon 		 * circuit.
3040c3a8cd0SMatthew Dillon 		 *
3050c3a8cd0SMatthew Dillon 		 * NOTE: CREATE in txcmd handled by dmsg_msg_write()
3060c3a8cd0SMatthew Dillon 		 * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx()
3070c3a8cd0SMatthew Dillon 		 */
3081b8eded1SMatthew Dillon 		pstate = state;
3090c3a8cd0SMatthew Dillon 		state = malloc(sizeof(*state));
310323c0947SMatthew Dillon 		atomic_add_int(&dmsg_state_count, 1);
3110c3a8cd0SMatthew Dillon 		bzero(state, sizeof(*state));
3121b8eded1SMatthew Dillon 		TAILQ_INIT(&state->subq);
313323c0947SMatthew Dillon 		dmsg_state_hold(pstate);
314323c0947SMatthew Dillon 		state->refs = 1;
3151b8eded1SMatthew Dillon 		state->parent = pstate;
3160c3a8cd0SMatthew Dillon 		state->iocom = iocom;
3170c3a8cd0SMatthew Dillon 		state->flags = DMSG_STATE_DYNAMIC;
3180c3a8cd0SMatthew Dillon 		state->msgid = (uint64_t)(uintptr_t)state;
3190c3a8cd0SMatthew Dillon 		state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE);
3200c3a8cd0SMatthew Dillon 		state->rxcmd = DMSGF_REPLY;
3210d20ec8aSMatthew Dillon 		state->icmd = state->txcmd & DMSGF_BASECMDMASK;
3220c3a8cd0SMatthew Dillon 		state->func = func;
3230c3a8cd0SMatthew Dillon 		state->any.any = data;
324d30cab67SMatthew Dillon 
3251b8eded1SMatthew Dillon 		RB_INSERT(dmsg_state_tree, &iocom->statewr_tree, state);
3261b8eded1SMatthew Dillon 		TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
3270c3a8cd0SMatthew Dillon 		state->flags |= DMSG_STATE_INSERTED;
3281b8eded1SMatthew Dillon 	} else {
3291b8eded1SMatthew Dillon 		/*
3301b8eded1SMatthew Dillon 		 * Otherwise the message is transmitted over the existing
3311b8eded1SMatthew Dillon 		 * open transaction.
3321b8eded1SMatthew Dillon 		 */
3331b8eded1SMatthew Dillon 		pstate = state->parent;
3340c3a8cd0SMatthew Dillon 	}
3351b8eded1SMatthew Dillon 
336a2179323SMatthew Dillon 	/* XXX SMP race for state */
337a2179323SMatthew Dillon 	hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
3380c3a8cd0SMatthew Dillon 	if (msg == NULL) {
339a2179323SMatthew Dillon 		msg = malloc(offsetof(struct dmsg_msg, any.head) + hbytes + 4);
340a2179323SMatthew Dillon 		bzero(msg, offsetof(struct dmsg_msg, any.head));
341a2179323SMatthew Dillon 		*(int *)((char *)msg +
342a2179323SMatthew Dillon 			 offsetof(struct dmsg_msg, any.head) + hbytes) =
343a2179323SMatthew Dillon 				 0x71B2C3D4;
344a2179323SMatthew Dillon #if 0
3450c3a8cd0SMatthew Dillon 		msg = malloc(sizeof(*msg));
3460c3a8cd0SMatthew Dillon 		bzero(msg, sizeof(*msg));
347a2179323SMatthew Dillon #endif
3480c3a8cd0SMatthew Dillon 	}
349f306de83SMatthew Dillon 
350f306de83SMatthew Dillon 	/*
351f306de83SMatthew Dillon 	 * [re]allocate the auxillary data buffer.  The caller knows that
352f306de83SMatthew Dillon 	 * a size-aligned buffer will be allocated but we do not want to
353f306de83SMatthew Dillon 	 * force the caller to zero any tail piece, so we do that ourself.
354f306de83SMatthew Dillon 	 */
3550c3a8cd0SMatthew Dillon 	if (msg->aux_size != aux_size) {
3560c3a8cd0SMatthew Dillon 		if (msg->aux_data) {
3570c3a8cd0SMatthew Dillon 			free(msg->aux_data);
3580c3a8cd0SMatthew Dillon 			msg->aux_data = NULL;
3590c3a8cd0SMatthew Dillon 			msg->aux_size = 0;
3600c3a8cd0SMatthew Dillon 		}
3610c3a8cd0SMatthew Dillon 		if (aux_size) {
362f306de83SMatthew Dillon 			msg->aux_data = malloc(aligned_size);
3630c3a8cd0SMatthew Dillon 			msg->aux_size = aux_size;
364f306de83SMatthew Dillon 			if (aux_size != aligned_size) {
365f306de83SMatthew Dillon 				bzero(msg->aux_data + aux_size,
366f306de83SMatthew Dillon 				      aligned_size - aux_size);
367f306de83SMatthew Dillon 			}
3680c3a8cd0SMatthew Dillon 		}
3690c3a8cd0SMatthew Dillon 	}
3701b8eded1SMatthew Dillon 
3711b8eded1SMatthew Dillon 	/*
3721b8eded1SMatthew Dillon 	 * Set REVTRANS if the transaction was remotely initiated
3731b8eded1SMatthew Dillon 	 * Set REVCIRC if the circuit was remotely initiated
3741b8eded1SMatthew Dillon 	 */
3751b8eded1SMatthew Dillon 	if (state->flags & DMSG_STATE_OPPOSITE)
3761b8eded1SMatthew Dillon 		cmd |= DMSGF_REVTRANS;
3771b8eded1SMatthew Dillon 	if (pstate->flags & DMSG_STATE_OPPOSITE)
3781b8eded1SMatthew Dillon 		cmd |= DMSGF_REVCIRC;
3791b8eded1SMatthew Dillon 
3801b8eded1SMatthew Dillon 	/*
3811b8eded1SMatthew Dillon 	 * Finish filling out the header.
3821b8eded1SMatthew Dillon 	 */
3830c3a8cd0SMatthew Dillon 	if (hbytes)
3840c3a8cd0SMatthew Dillon 		bzero(&msg->any.head, hbytes);
3850c3a8cd0SMatthew Dillon 	msg->hdr_size = hbytes;
3860d20ec8aSMatthew Dillon 	msg->any.head.magic = DMSG_HDR_MAGIC;
3870c3a8cd0SMatthew Dillon 	msg->any.head.cmd = cmd;
3880c3a8cd0SMatthew Dillon 	msg->any.head.aux_descr = 0;
3890c3a8cd0SMatthew Dillon 	msg->any.head.aux_crc = 0;
3900c3a8cd0SMatthew Dillon 	msg->any.head.msgid = state->msgid;
3911b8eded1SMatthew Dillon 	msg->any.head.circuit = pstate->msgid;
3921b8eded1SMatthew Dillon 	msg->state = state;
3931b8eded1SMatthew Dillon 
3940c3a8cd0SMatthew Dillon 	return (msg);
3950c3a8cd0SMatthew Dillon }
3960c3a8cd0SMatthew Dillon 
3970c3a8cd0SMatthew Dillon /*
3980c3a8cd0SMatthew Dillon  * Free a message so it can be reused afresh.
3990c3a8cd0SMatthew Dillon  *
4000c3a8cd0SMatthew Dillon  * NOTE: aux_size can be 0 with a non-NULL aux_data.
4010c3a8cd0SMatthew Dillon  */
4020c3a8cd0SMatthew Dillon static
4030c3a8cd0SMatthew Dillon void
4040c3a8cd0SMatthew Dillon dmsg_msg_free_locked(dmsg_msg_t *msg)
4050c3a8cd0SMatthew Dillon {
406a2179323SMatthew Dillon 	/*dmsg_iocom_t *iocom = msg->iocom;*/
407a2179323SMatthew Dillon #if 1
408a2179323SMatthew Dillon 	int hbytes = (msg->any.head.cmd & DMSGF_SIZE) * DMSG_ALIGN;
409a2179323SMatthew Dillon 	if (*(int *)((char *)msg +
410a2179323SMatthew Dillon 		     offsetof(struct  dmsg_msg, any.head) + hbytes) !=
411a2179323SMatthew Dillon 	     0x71B2C3D4) {
412a2179323SMatthew Dillon 		fprintf(stderr, "MSGFREE FAILED CMD %08x\n", msg->any.head.cmd);
413a2179323SMatthew Dillon 		assert(0);
414a2179323SMatthew Dillon 	}
415a2179323SMatthew Dillon #endif
416323c0947SMatthew Dillon 	msg->state = NULL;	/* safety */
417a2179323SMatthew Dillon 	if (msg->aux_data) {
418a2179323SMatthew Dillon 		free(msg->aux_data);
419a2179323SMatthew Dillon 		msg->aux_data = NULL;
420a2179323SMatthew Dillon 	}
421a2179323SMatthew Dillon 	msg->aux_size = 0;
422a2179323SMatthew Dillon 	free (msg);
423a2179323SMatthew Dillon #if 0
4240c3a8cd0SMatthew Dillon 	if (msg->aux_data)
4250c3a8cd0SMatthew Dillon 		TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
4260c3a8cd0SMatthew Dillon 	else
4270c3a8cd0SMatthew Dillon 		TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
428a2179323SMatthew Dillon #endif
4290c3a8cd0SMatthew Dillon }
4300c3a8cd0SMatthew Dillon 
4310c3a8cd0SMatthew Dillon void
4320c3a8cd0SMatthew Dillon dmsg_msg_free(dmsg_msg_t *msg)
4330c3a8cd0SMatthew Dillon {
4341b8eded1SMatthew Dillon 	dmsg_iocom_t *iocom = msg->state->iocom;
4350c3a8cd0SMatthew Dillon 
4360c3a8cd0SMatthew Dillon 	pthread_mutex_lock(&iocom->mtx);
4370c3a8cd0SMatthew Dillon 	dmsg_msg_free_locked(msg);
4380c3a8cd0SMatthew Dillon 	pthread_mutex_unlock(&iocom->mtx);
4390c3a8cd0SMatthew Dillon }
4400c3a8cd0SMatthew Dillon 
4410c3a8cd0SMatthew Dillon /*
4420c3a8cd0SMatthew Dillon  * I/O core loop for an iocom.
4430c3a8cd0SMatthew Dillon  *
4440c3a8cd0SMatthew Dillon  * Thread localized, iocom->mtx not held.
4450c3a8cd0SMatthew Dillon  */
4460c3a8cd0SMatthew Dillon void
4470c3a8cd0SMatthew Dillon dmsg_iocom_core(dmsg_iocom_t *iocom)
4480c3a8cd0SMatthew Dillon {
4490c3a8cd0SMatthew Dillon 	struct pollfd fds[3];
4500c3a8cd0SMatthew Dillon 	char dummybuf[256];
4510c3a8cd0SMatthew Dillon 	dmsg_msg_t *msg;
4520c3a8cd0SMatthew Dillon 	int timeout;
4530c3a8cd0SMatthew Dillon 	int count;
4540c3a8cd0SMatthew Dillon 	int wi;	/* wakeup pipe */
4550c3a8cd0SMatthew Dillon 	int si;	/* socket */
4560c3a8cd0SMatthew Dillon 	int ai;	/* alt bulk path socket */
4570c3a8cd0SMatthew Dillon 
4580c3a8cd0SMatthew Dillon 	while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) {
459a2179323SMatthew Dillon 		/*
460a2179323SMatthew Dillon 		 * These iocom->flags are only manipulated within the
461a2179323SMatthew Dillon 		 * context of the current thread.  However, modifications
462a2179323SMatthew Dillon 		 * still require atomic ops.
463a2179323SMatthew Dillon 		 */
4640c3a8cd0SMatthew Dillon 		if ((iocom->flags & (DMSG_IOCOMF_RWORK |
4650c3a8cd0SMatthew Dillon 				     DMSG_IOCOMF_WWORK |
4660c3a8cd0SMatthew Dillon 				     DMSG_IOCOMF_PWORK |
4670c3a8cd0SMatthew Dillon 				     DMSG_IOCOMF_SWORK |
4680c3a8cd0SMatthew Dillon 				     DMSG_IOCOMF_ARWORK |
4690c3a8cd0SMatthew Dillon 				     DMSG_IOCOMF_AWWORK)) == 0) {
4700c3a8cd0SMatthew Dillon 			/*
4710c3a8cd0SMatthew Dillon 			 * Only poll if no immediate work is pending.
4720c3a8cd0SMatthew Dillon 			 * Otherwise we are just wasting our time calling
4730c3a8cd0SMatthew Dillon 			 * poll.
4740c3a8cd0SMatthew Dillon 			 */
4750c3a8cd0SMatthew Dillon 			timeout = 5000;
4760c3a8cd0SMatthew Dillon 
4770c3a8cd0SMatthew Dillon 			count = 0;
4780c3a8cd0SMatthew Dillon 			wi = -1;
4790c3a8cd0SMatthew Dillon 			si = -1;
4800c3a8cd0SMatthew Dillon 			ai = -1;
4810c3a8cd0SMatthew Dillon 
4820c3a8cd0SMatthew Dillon 			/*
4830c3a8cd0SMatthew Dillon 			 * Always check the inter-thread pipe, e.g.
4840c3a8cd0SMatthew Dillon 			 * for iocom->txmsgq work.
4850c3a8cd0SMatthew Dillon 			 */
4860c3a8cd0SMatthew Dillon 			wi = count++;
4870c3a8cd0SMatthew Dillon 			fds[wi].fd = iocom->wakeupfds[0];
4880c3a8cd0SMatthew Dillon 			fds[wi].events = POLLIN;
4890c3a8cd0SMatthew Dillon 			fds[wi].revents = 0;
4900c3a8cd0SMatthew Dillon 
4910c3a8cd0SMatthew Dillon 			/*
4920c3a8cd0SMatthew Dillon 			 * Check the socket input/output direction as
4930c3a8cd0SMatthew Dillon 			 * requested
4940c3a8cd0SMatthew Dillon 			 */
4950c3a8cd0SMatthew Dillon 			if (iocom->flags & (DMSG_IOCOMF_RREQ |
4960c3a8cd0SMatthew Dillon 					    DMSG_IOCOMF_WREQ)) {
4970c3a8cd0SMatthew Dillon 				si = count++;
4980c3a8cd0SMatthew Dillon 				fds[si].fd = iocom->sock_fd;
4990c3a8cd0SMatthew Dillon 				fds[si].events = 0;
5000c3a8cd0SMatthew Dillon 				fds[si].revents = 0;
5010c3a8cd0SMatthew Dillon 
5020c3a8cd0SMatthew Dillon 				if (iocom->flags & DMSG_IOCOMF_RREQ)
5030c3a8cd0SMatthew Dillon 					fds[si].events |= POLLIN;
5040c3a8cd0SMatthew Dillon 				if (iocom->flags & DMSG_IOCOMF_WREQ)
5050c3a8cd0SMatthew Dillon 					fds[si].events |= POLLOUT;
5060c3a8cd0SMatthew Dillon 			}
5070c3a8cd0SMatthew Dillon 
5080c3a8cd0SMatthew Dillon 			/*
5090c3a8cd0SMatthew Dillon 			 * Check the alternative fd for work.
5100c3a8cd0SMatthew Dillon 			 */
5110c3a8cd0SMatthew Dillon 			if (iocom->alt_fd >= 0) {
5120c3a8cd0SMatthew Dillon 				ai = count++;
5130c3a8cd0SMatthew Dillon 				fds[ai].fd = iocom->alt_fd;
5140c3a8cd0SMatthew Dillon 				fds[ai].events = POLLIN;
5150c3a8cd0SMatthew Dillon 				fds[ai].revents = 0;
5160c3a8cd0SMatthew Dillon 			}
5170c3a8cd0SMatthew Dillon 			poll(fds, count, timeout);
5180c3a8cd0SMatthew Dillon 
5190c3a8cd0SMatthew Dillon 			if (wi >= 0 && (fds[wi].revents & POLLIN))
520a2179323SMatthew Dillon 				atomic_set_int(&iocom->flags,
521a2179323SMatthew Dillon 					       DMSG_IOCOMF_PWORK);
5220c3a8cd0SMatthew Dillon 			if (si >= 0 && (fds[si].revents & POLLIN))
523a2179323SMatthew Dillon 				atomic_set_int(&iocom->flags,
524a2179323SMatthew Dillon 					       DMSG_IOCOMF_RWORK);
5250c3a8cd0SMatthew Dillon 			if (si >= 0 && (fds[si].revents & POLLOUT))
526a2179323SMatthew Dillon 				atomic_set_int(&iocom->flags,
527a2179323SMatthew Dillon 					       DMSG_IOCOMF_WWORK);
5280c3a8cd0SMatthew Dillon 			if (wi >= 0 && (fds[wi].revents & POLLOUT))
529a2179323SMatthew Dillon 				atomic_set_int(&iocom->flags,
530a2179323SMatthew Dillon 					       DMSG_IOCOMF_WWORK);
5310c3a8cd0SMatthew Dillon 			if (ai >= 0 && (fds[ai].revents & POLLIN))
532a2179323SMatthew Dillon 				atomic_set_int(&iocom->flags,
533a2179323SMatthew Dillon 					       DMSG_IOCOMF_ARWORK);
5340c3a8cd0SMatthew Dillon 		} else {
5350c3a8cd0SMatthew Dillon 			/*
5360c3a8cd0SMatthew Dillon 			 * Always check the pipe
5370c3a8cd0SMatthew Dillon 			 */
538a2179323SMatthew Dillon 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_PWORK);
5390c3a8cd0SMatthew Dillon 		}
5400c3a8cd0SMatthew Dillon 
5410c3a8cd0SMatthew Dillon 		if (iocom->flags & DMSG_IOCOMF_SWORK) {
542a2179323SMatthew Dillon 			atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
5430d20ec8aSMatthew Dillon 			iocom->signal_callback(iocom);
5440c3a8cd0SMatthew Dillon 		}
5450c3a8cd0SMatthew Dillon 
5460c3a8cd0SMatthew Dillon 		/*
5470c3a8cd0SMatthew Dillon 		 * Pending message queues from other threads wake us up
5480c3a8cd0SMatthew Dillon 		 * with a write to the wakeupfds[] pipe.  We have to clear
5490c3a8cd0SMatthew Dillon 		 * the pipe with a dummy read.
5500c3a8cd0SMatthew Dillon 		 */
5510c3a8cd0SMatthew Dillon 		if (iocom->flags & DMSG_IOCOMF_PWORK) {
552a2179323SMatthew Dillon 			atomic_clear_int(&iocom->flags, DMSG_IOCOMF_PWORK);
5530c3a8cd0SMatthew Dillon 			read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
554a2179323SMatthew Dillon 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
555a2179323SMatthew Dillon 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK);
5560d20ec8aSMatthew Dillon 			if (TAILQ_FIRST(&iocom->txmsgq))
5570c3a8cd0SMatthew Dillon 				dmsg_iocom_flush1(iocom);
5580c3a8cd0SMatthew Dillon 		}
5590c3a8cd0SMatthew Dillon 
5600c3a8cd0SMatthew Dillon 		/*
5610c3a8cd0SMatthew Dillon 		 * Message write sequencing
5620c3a8cd0SMatthew Dillon 		 */
5630c3a8cd0SMatthew Dillon 		if (iocom->flags & DMSG_IOCOMF_WWORK)
5640c3a8cd0SMatthew Dillon 			dmsg_iocom_flush1(iocom);
5650c3a8cd0SMatthew Dillon 
5660c3a8cd0SMatthew Dillon 		/*
5670c3a8cd0SMatthew Dillon 		 * Message read sequencing.  Run this after the write
5680c3a8cd0SMatthew Dillon 		 * sequencing in case the write sequencing allowed another
5690c3a8cd0SMatthew Dillon 		 * auto-DELETE to occur on the read side.
5700c3a8cd0SMatthew Dillon 		 */
5710c3a8cd0SMatthew Dillon 		if (iocom->flags & DMSG_IOCOMF_RWORK) {
5720c3a8cd0SMatthew Dillon 			while ((iocom->flags & DMSG_IOCOMF_EOF) == 0 &&
5730c3a8cd0SMatthew Dillon 			       (msg = dmsg_ioq_read(iocom)) != NULL) {
5740c3a8cd0SMatthew Dillon 				if (DMsgDebugOpt) {
5750c3a8cd0SMatthew Dillon 					fprintf(stderr, "receive %s\n",
5760c3a8cd0SMatthew Dillon 						dmsg_msg_str(msg));
5770c3a8cd0SMatthew Dillon 				}
5780d20ec8aSMatthew Dillon 				iocom->rcvmsg_callback(msg);
5790c3a8cd0SMatthew Dillon 				dmsg_state_cleanuprx(iocom, msg);
5800c3a8cd0SMatthew Dillon 			}
5810c3a8cd0SMatthew Dillon 		}
5820c3a8cd0SMatthew Dillon 
5830c3a8cd0SMatthew Dillon 		if (iocom->flags & DMSG_IOCOMF_ARWORK) {
584a2179323SMatthew Dillon 			atomic_clear_int(&iocom->flags, DMSG_IOCOMF_ARWORK);
5850d20ec8aSMatthew Dillon 			iocom->altmsg_callback(iocom);
5860c3a8cd0SMatthew Dillon 		}
5870c3a8cd0SMatthew Dillon 	}
5880c3a8cd0SMatthew Dillon }
5890c3a8cd0SMatthew Dillon 
5900c3a8cd0SMatthew Dillon /*
5910c3a8cd0SMatthew Dillon  * Make sure there's enough room in the FIFO to hold the
5920c3a8cd0SMatthew Dillon  * needed data.
5930c3a8cd0SMatthew Dillon  *
5940c3a8cd0SMatthew Dillon  * Assume worst case encrypted form is 2x the size of the
5950c3a8cd0SMatthew Dillon  * plaintext equivalent.
5960c3a8cd0SMatthew Dillon  */
5970c3a8cd0SMatthew Dillon static
5980c3a8cd0SMatthew Dillon size_t
5990c3a8cd0SMatthew Dillon dmsg_ioq_makeroom(dmsg_ioq_t *ioq, size_t needed)
6000c3a8cd0SMatthew Dillon {
6010c3a8cd0SMatthew Dillon 	size_t bytes;
6020c3a8cd0SMatthew Dillon 	size_t nmax;
6030c3a8cd0SMatthew Dillon 
6040c3a8cd0SMatthew Dillon 	bytes = ioq->fifo_cdx - ioq->fifo_beg;
6050c3a8cd0SMatthew Dillon 	nmax = sizeof(ioq->buf) - ioq->fifo_end;
6060c3a8cd0SMatthew Dillon 	if (bytes + nmax / 2 < needed) {
6070c3a8cd0SMatthew Dillon 		if (bytes) {
6080c3a8cd0SMatthew Dillon 			bcopy(ioq->buf + ioq->fifo_beg,
6090c3a8cd0SMatthew Dillon 			      ioq->buf,
6100c3a8cd0SMatthew Dillon 			      bytes);
6110c3a8cd0SMatthew Dillon 		}
6120c3a8cd0SMatthew Dillon 		ioq->fifo_cdx -= ioq->fifo_beg;
6130c3a8cd0SMatthew Dillon 		ioq->fifo_beg = 0;
6140c3a8cd0SMatthew Dillon 		if (ioq->fifo_cdn < ioq->fifo_end) {
6150c3a8cd0SMatthew Dillon 			bcopy(ioq->buf + ioq->fifo_cdn,
6160c3a8cd0SMatthew Dillon 			      ioq->buf + ioq->fifo_cdx,
6170c3a8cd0SMatthew Dillon 			      ioq->fifo_end - ioq->fifo_cdn);
6180c3a8cd0SMatthew Dillon 		}
6190c3a8cd0SMatthew Dillon 		ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx;
6200c3a8cd0SMatthew Dillon 		ioq->fifo_cdn = ioq->fifo_cdx;
6210c3a8cd0SMatthew Dillon 		nmax = sizeof(ioq->buf) - ioq->fifo_end;
6220c3a8cd0SMatthew Dillon 	}
6230c3a8cd0SMatthew Dillon 	return(nmax);
6240c3a8cd0SMatthew Dillon }
6250c3a8cd0SMatthew Dillon 
6260c3a8cd0SMatthew Dillon /*
6270c3a8cd0SMatthew Dillon  * Read the next ready message from the ioq, issuing I/O if needed.
6280c3a8cd0SMatthew Dillon  * Caller should retry on a read-event when NULL is returned.
6290c3a8cd0SMatthew Dillon  *
6300c3a8cd0SMatthew Dillon  * If an error occurs during reception a DMSG_LNK_ERROR msg will
6310c3a8cd0SMatthew Dillon  * be returned for each open transaction, then the ioq and iocom
6320c3a8cd0SMatthew Dillon  * will be errored out and a non-transactional DMSG_LNK_ERROR
6330c3a8cd0SMatthew Dillon  * msg will be returned as the final message.  The caller should not call
6340c3a8cd0SMatthew Dillon  * us again after the final message is returned.
6350c3a8cd0SMatthew Dillon  *
6360c3a8cd0SMatthew Dillon  * Thread localized, iocom->mtx not held.
6370c3a8cd0SMatthew Dillon  */
6380c3a8cd0SMatthew Dillon dmsg_msg_t *
6390c3a8cd0SMatthew Dillon dmsg_ioq_read(dmsg_iocom_t *iocom)
6400c3a8cd0SMatthew Dillon {
6410c3a8cd0SMatthew Dillon 	dmsg_ioq_t *ioq = &iocom->ioq_rx;
6420c3a8cd0SMatthew Dillon 	dmsg_msg_t *msg;
6430c3a8cd0SMatthew Dillon 	dmsg_state_t *state;
6440c3a8cd0SMatthew Dillon 	dmsg_hdr_t *head;
6450c3a8cd0SMatthew Dillon 	ssize_t n;
6460c3a8cd0SMatthew Dillon 	size_t bytes;
6470c3a8cd0SMatthew Dillon 	size_t nmax;
648f306de83SMatthew Dillon 	uint32_t aux_size;
6490c3a8cd0SMatthew Dillon 	uint32_t xcrc32;
6500c3a8cd0SMatthew Dillon 	int error;
6510c3a8cd0SMatthew Dillon 
6520c3a8cd0SMatthew Dillon again:
6530c3a8cd0SMatthew Dillon 	/*
6540c3a8cd0SMatthew Dillon 	 * If a message is already pending we can just remove and
6550c3a8cd0SMatthew Dillon 	 * return it.  Message state has already been processed.
6560c3a8cd0SMatthew Dillon 	 * (currently not implemented)
6570c3a8cd0SMatthew Dillon 	 */
6580c3a8cd0SMatthew Dillon 	if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
6590c3a8cd0SMatthew Dillon 		TAILQ_REMOVE(&ioq->msgq, msg, qentry);
6600c3a8cd0SMatthew Dillon 		return (msg);
6610c3a8cd0SMatthew Dillon 	}
662a2179323SMatthew Dillon 	atomic_clear_int(&iocom->flags, DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK);
6630c3a8cd0SMatthew Dillon 
6640c3a8cd0SMatthew Dillon 	/*
6650c3a8cd0SMatthew Dillon 	 * If the stream is errored out we stop processing it.
6660c3a8cd0SMatthew Dillon 	 */
6670c3a8cd0SMatthew Dillon 	if (ioq->error)
6680c3a8cd0SMatthew Dillon 		goto skip;
6690c3a8cd0SMatthew Dillon 
6700c3a8cd0SMatthew Dillon 	/*
6710c3a8cd0SMatthew Dillon 	 * Message read in-progress (msg is NULL at the moment).  We don't
6720c3a8cd0SMatthew Dillon 	 * allocate a msg until we have its core header.
6730c3a8cd0SMatthew Dillon 	 */
6740c3a8cd0SMatthew Dillon 	nmax = sizeof(ioq->buf) - ioq->fifo_end;
6750c3a8cd0SMatthew Dillon 	bytes = ioq->fifo_cdx - ioq->fifo_beg;		/* already decrypted */
6760c3a8cd0SMatthew Dillon 	msg = ioq->msg;
6770c3a8cd0SMatthew Dillon 
6780c3a8cd0SMatthew Dillon 	switch(ioq->state) {
6790c3a8cd0SMatthew Dillon 	case DMSG_MSGQ_STATE_HEADER1:
6800c3a8cd0SMatthew Dillon 		/*
6810c3a8cd0SMatthew Dillon 		 * Load the primary header, fail on any non-trivial read
6820c3a8cd0SMatthew Dillon 		 * error or on EOF.  Since the primary header is the same
6830c3a8cd0SMatthew Dillon 		 * size is the message alignment it will never straddle
6840c3a8cd0SMatthew Dillon 		 * the end of the buffer.
6850c3a8cd0SMatthew Dillon 		 */
6860c3a8cd0SMatthew Dillon 		nmax = dmsg_ioq_makeroom(ioq, sizeof(msg->any.head));
6870c3a8cd0SMatthew Dillon 		if (bytes < sizeof(msg->any.head)) {
6880c3a8cd0SMatthew Dillon 			n = read(iocom->sock_fd,
6890c3a8cd0SMatthew Dillon 				 ioq->buf + ioq->fifo_end,
6900c3a8cd0SMatthew Dillon 				 nmax);
6910c3a8cd0SMatthew Dillon 			if (n <= 0) {
6920c3a8cd0SMatthew Dillon 				if (n == 0) {
6930c3a8cd0SMatthew Dillon 					ioq->error = DMSG_IOQ_ERROR_EOF;
6940c3a8cd0SMatthew Dillon 					break;
6950c3a8cd0SMatthew Dillon 				}
6960c3a8cd0SMatthew Dillon 				if (errno != EINTR &&
6970c3a8cd0SMatthew Dillon 				    errno != EINPROGRESS &&
6980c3a8cd0SMatthew Dillon 				    errno != EAGAIN) {
6990c3a8cd0SMatthew Dillon 					ioq->error = DMSG_IOQ_ERROR_SOCK;
7000c3a8cd0SMatthew Dillon 					break;
7010c3a8cd0SMatthew Dillon 				}
7020c3a8cd0SMatthew Dillon 				n = 0;
7030c3a8cd0SMatthew Dillon 				/* fall through */
7040c3a8cd0SMatthew Dillon 			}
7050c3a8cd0SMatthew Dillon 			ioq->fifo_end += (size_t)n;
7060c3a8cd0SMatthew Dillon 			nmax -= (size_t)n;
7070c3a8cd0SMatthew Dillon 		}
7080c3a8cd0SMatthew Dillon 
7090c3a8cd0SMatthew Dillon 		/*
7100c3a8cd0SMatthew Dillon 		 * Decrypt data received so far.  Data will be decrypted
7110c3a8cd0SMatthew Dillon 		 * in-place but might create gaps in the FIFO.  Partial
7120c3a8cd0SMatthew Dillon 		 * blocks are not immediately decrypted.
7130c3a8cd0SMatthew Dillon 		 *
7140c3a8cd0SMatthew Dillon 		 * WARNING!  The header might be in the wrong endian, we
7150c3a8cd0SMatthew Dillon 		 *	     do not fix it up until we get the entire
7160c3a8cd0SMatthew Dillon 		 *	     extended header.
7170c3a8cd0SMatthew Dillon 		 */
7180c3a8cd0SMatthew Dillon 		if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
7190c3a8cd0SMatthew Dillon 			dmsg_crypto_decrypt(iocom, ioq);
7200c3a8cd0SMatthew Dillon 		} else {
7210c3a8cd0SMatthew Dillon 			ioq->fifo_cdx = ioq->fifo_end;
7220c3a8cd0SMatthew Dillon 			ioq->fifo_cdn = ioq->fifo_end;
7230c3a8cd0SMatthew Dillon 		}
7240c3a8cd0SMatthew Dillon 		bytes = ioq->fifo_cdx - ioq->fifo_beg;
7250c3a8cd0SMatthew Dillon 
7260c3a8cd0SMatthew Dillon 		/*
7270c3a8cd0SMatthew Dillon 		 * Insufficient data accumulated (msg is NULL, caller will
7280c3a8cd0SMatthew Dillon 		 * retry on event).
7290c3a8cd0SMatthew Dillon 		 */
7300c3a8cd0SMatthew Dillon 		assert(msg == NULL);
7310c3a8cd0SMatthew Dillon 		if (bytes < sizeof(msg->any.head))
7320c3a8cd0SMatthew Dillon 			break;
7330c3a8cd0SMatthew Dillon 
7340c3a8cd0SMatthew Dillon 		/*
7350c3a8cd0SMatthew Dillon 		 * Check and fixup the core header.  Note that the icrc
7360c3a8cd0SMatthew Dillon 		 * has to be calculated before any fixups, but the crc
7370c3a8cd0SMatthew Dillon 		 * fields in the msg may have to be swapped like everything
7380c3a8cd0SMatthew Dillon 		 * else.
7390c3a8cd0SMatthew Dillon 		 */
7400c3a8cd0SMatthew Dillon 		head = (void *)(ioq->buf + ioq->fifo_beg);
7410c3a8cd0SMatthew Dillon 		if (head->magic != DMSG_HDR_MAGIC &&
7420c3a8cd0SMatthew Dillon 		    head->magic != DMSG_HDR_MAGIC_REV) {
743f306de83SMatthew Dillon 			fprintf(stderr, "%s: head->magic is bad %02x\n",
744f306de83SMatthew Dillon 				iocom->label, head->magic);
745f306de83SMatthew Dillon 			if (iocom->flags & DMSG_IOCOMF_CRYPTED)
746f306de83SMatthew Dillon 				fprintf(stderr, "(on encrypted link)\n");
7470c3a8cd0SMatthew Dillon 			ioq->error = DMSG_IOQ_ERROR_SYNC;
7480c3a8cd0SMatthew Dillon 			break;
7490c3a8cd0SMatthew Dillon 		}
7500c3a8cd0SMatthew Dillon 
7510c3a8cd0SMatthew Dillon 		/*
7520c3a8cd0SMatthew Dillon 		 * Calculate the full header size and aux data size
7530c3a8cd0SMatthew Dillon 		 */
7540c3a8cd0SMatthew Dillon 		if (head->magic == DMSG_HDR_MAGIC_REV) {
7550c3a8cd0SMatthew Dillon 			ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) *
7560c3a8cd0SMatthew Dillon 				      DMSG_ALIGN;
757f306de83SMatthew Dillon 			aux_size = bswap32(head->aux_bytes);
7580c3a8cd0SMatthew Dillon 		} else {
7590c3a8cd0SMatthew Dillon 			ioq->hbytes = (head->cmd & DMSGF_SIZE) *
7600c3a8cd0SMatthew Dillon 				      DMSG_ALIGN;
761f306de83SMatthew Dillon 			aux_size = head->aux_bytes;
7620c3a8cd0SMatthew Dillon 		}
763f306de83SMatthew Dillon 		ioq->abytes = DMSG_DOALIGN(aux_size);
764f306de83SMatthew Dillon 		ioq->unaligned_aux_size = aux_size;
7650c3a8cd0SMatthew Dillon 		if (ioq->hbytes < sizeof(msg->any.head) ||
7660c3a8cd0SMatthew Dillon 		    ioq->hbytes > sizeof(msg->any) ||
7670c3a8cd0SMatthew Dillon 		    ioq->abytes > DMSG_AUX_MAX) {
7680c3a8cd0SMatthew Dillon 			ioq->error = DMSG_IOQ_ERROR_FIELD;
7690c3a8cd0SMatthew Dillon 			break;
7700c3a8cd0SMatthew Dillon 		}
7710c3a8cd0SMatthew Dillon 
7720c3a8cd0SMatthew Dillon 		/*
7730c3a8cd0SMatthew Dillon 		 * Allocate the message, the next state will fill it in.
7741b8eded1SMatthew Dillon 		 *
7751b8eded1SMatthew Dillon 		 * NOTE: The aux_data buffer will be sized to an aligned
7761b8eded1SMatthew Dillon 		 *	 value and the aligned remainder zero'd for
7771b8eded1SMatthew Dillon 		 *	 convenience.
7781b8eded1SMatthew Dillon 		 *
7791b8eded1SMatthew Dillon 		 * NOTE: Supply dummy state and a degenerate cmd without
7801b8eded1SMatthew Dillon 		 *	 CREATE set.  The message will temporarily be
7811b8eded1SMatthew Dillon 		 *	 associated with state0 until later post-processing.
7820c3a8cd0SMatthew Dillon 		 */
7831b8eded1SMatthew Dillon 		msg = dmsg_msg_alloc(&iocom->state0, aux_size,
784a2179323SMatthew Dillon 				     ioq->hbytes / DMSG_ALIGN,
7850c3a8cd0SMatthew Dillon 				     NULL, NULL);
7860c3a8cd0SMatthew Dillon 		ioq->msg = msg;
7870c3a8cd0SMatthew Dillon 
7880c3a8cd0SMatthew Dillon 		/*
7890c3a8cd0SMatthew Dillon 		 * Fall through to the next state.  Make sure that the
7900c3a8cd0SMatthew Dillon 		 * extended header does not straddle the end of the buffer.
7910c3a8cd0SMatthew Dillon 		 * We still want to issue larger reads into our buffer,
7920c3a8cd0SMatthew Dillon 		 * book-keeping is easier if we don't bcopy() yet.
7930c3a8cd0SMatthew Dillon 		 *
7940c3a8cd0SMatthew Dillon 		 * Make sure there is enough room for bloated encrypt data.
7950c3a8cd0SMatthew Dillon 		 */
7960c3a8cd0SMatthew Dillon 		nmax = dmsg_ioq_makeroom(ioq, ioq->hbytes);
7970c3a8cd0SMatthew Dillon 		ioq->state = DMSG_MSGQ_STATE_HEADER2;
7980c3a8cd0SMatthew Dillon 		/* fall through */
7990c3a8cd0SMatthew Dillon 	case DMSG_MSGQ_STATE_HEADER2:
8000c3a8cd0SMatthew Dillon 		/*
8010c3a8cd0SMatthew Dillon 		 * Fill out the extended header.
8020c3a8cd0SMatthew Dillon 		 */
8030c3a8cd0SMatthew Dillon 		assert(msg != NULL);
8040c3a8cd0SMatthew Dillon 		if (bytes < ioq->hbytes) {
8050c3a8cd0SMatthew Dillon 			n = read(iocom->sock_fd,
8060c3a8cd0SMatthew Dillon 				 ioq->buf + ioq->fifo_end,
8070c3a8cd0SMatthew Dillon 				 nmax);
8080c3a8cd0SMatthew Dillon 			if (n <= 0) {
8090c3a8cd0SMatthew Dillon 				if (n == 0) {
8100c3a8cd0SMatthew Dillon 					ioq->error = DMSG_IOQ_ERROR_EOF;
8110c3a8cd0SMatthew Dillon 					break;
8120c3a8cd0SMatthew Dillon 				}
8130c3a8cd0SMatthew Dillon 				if (errno != EINTR &&
8140c3a8cd0SMatthew Dillon 				    errno != EINPROGRESS &&
8150c3a8cd0SMatthew Dillon 				    errno != EAGAIN) {
8160c3a8cd0SMatthew Dillon 					ioq->error = DMSG_IOQ_ERROR_SOCK;
8170c3a8cd0SMatthew Dillon 					break;
8180c3a8cd0SMatthew Dillon 				}
8190c3a8cd0SMatthew Dillon 				n = 0;
8200c3a8cd0SMatthew Dillon 				/* fall through */
8210c3a8cd0SMatthew Dillon 			}
8220c3a8cd0SMatthew Dillon 			ioq->fifo_end += (size_t)n;
8230c3a8cd0SMatthew Dillon 			nmax -= (size_t)n;
8240c3a8cd0SMatthew Dillon 		}
8250c3a8cd0SMatthew Dillon 
8260c3a8cd0SMatthew Dillon 		if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
8270c3a8cd0SMatthew Dillon 			dmsg_crypto_decrypt(iocom, ioq);
8280c3a8cd0SMatthew Dillon 		} else {
8290c3a8cd0SMatthew Dillon 			ioq->fifo_cdx = ioq->fifo_end;
8300c3a8cd0SMatthew Dillon 			ioq->fifo_cdn = ioq->fifo_end;
8310c3a8cd0SMatthew Dillon 		}
8320c3a8cd0SMatthew Dillon 		bytes = ioq->fifo_cdx - ioq->fifo_beg;
8330c3a8cd0SMatthew Dillon 
8340c3a8cd0SMatthew Dillon 		/*
8350c3a8cd0SMatthew Dillon 		 * Insufficient data accumulated (set msg NULL so caller will
8360c3a8cd0SMatthew Dillon 		 * retry on event).
8370c3a8cd0SMatthew Dillon 		 */
8380c3a8cd0SMatthew Dillon 		if (bytes < ioq->hbytes) {
8390c3a8cd0SMatthew Dillon 			msg = NULL;
8400c3a8cd0SMatthew Dillon 			break;
8410c3a8cd0SMatthew Dillon 		}
8420c3a8cd0SMatthew Dillon 
8430c3a8cd0SMatthew Dillon 		/*
8440c3a8cd0SMatthew Dillon 		 * Calculate the extended header, decrypt data received
8450c3a8cd0SMatthew Dillon 		 * so far.  Handle endian-conversion for the entire extended
8460c3a8cd0SMatthew Dillon 		 * header.
8470c3a8cd0SMatthew Dillon 		 */
8480c3a8cd0SMatthew Dillon 		head = (void *)(ioq->buf + ioq->fifo_beg);
8490c3a8cd0SMatthew Dillon 
8500c3a8cd0SMatthew Dillon 		/*
8510c3a8cd0SMatthew Dillon 		 * Check the CRC.
8520c3a8cd0SMatthew Dillon 		 */
8530c3a8cd0SMatthew Dillon 		if (head->magic == DMSG_HDR_MAGIC_REV)
8540c3a8cd0SMatthew Dillon 			xcrc32 = bswap32(head->hdr_crc);
8550c3a8cd0SMatthew Dillon 		else
8560c3a8cd0SMatthew Dillon 			xcrc32 = head->hdr_crc;
8570c3a8cd0SMatthew Dillon 		head->hdr_crc = 0;
8580c3a8cd0SMatthew Dillon 		if (dmsg_icrc32(head, ioq->hbytes) != xcrc32) {
8590c3a8cd0SMatthew Dillon 			ioq->error = DMSG_IOQ_ERROR_XCRC;
8600c3a8cd0SMatthew Dillon 			fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n",
8610c3a8cd0SMatthew Dillon 				xcrc32, dmsg_icrc32(head, ioq->hbytes),
8620c3a8cd0SMatthew Dillon 				dmsg_msg_str(msg));
8630c3a8cd0SMatthew Dillon 			assert(0);
8640c3a8cd0SMatthew Dillon 			break;
8650c3a8cd0SMatthew Dillon 		}
8660c3a8cd0SMatthew Dillon 		head->hdr_crc = xcrc32;
8670c3a8cd0SMatthew Dillon 
8680c3a8cd0SMatthew Dillon 		if (head->magic == DMSG_HDR_MAGIC_REV) {
8690c3a8cd0SMatthew Dillon 			dmsg_bswap_head(head);
8700c3a8cd0SMatthew Dillon 		}
8710c3a8cd0SMatthew Dillon 
8720c3a8cd0SMatthew Dillon 		/*
8730c3a8cd0SMatthew Dillon 		 * Copy the extended header into the msg and adjust the
8740c3a8cd0SMatthew Dillon 		 * FIFO.
8750c3a8cd0SMatthew Dillon 		 */
8760c3a8cd0SMatthew Dillon 		bcopy(head, &msg->any, ioq->hbytes);
8770c3a8cd0SMatthew Dillon 
8780c3a8cd0SMatthew Dillon 		/*
8790c3a8cd0SMatthew Dillon 		 * We are either done or we fall-through.
8800c3a8cd0SMatthew Dillon 		 */
8810c3a8cd0SMatthew Dillon 		if (ioq->abytes == 0) {
8820c3a8cd0SMatthew Dillon 			ioq->fifo_beg += ioq->hbytes;
8830c3a8cd0SMatthew Dillon 			break;
8840c3a8cd0SMatthew Dillon 		}
8850c3a8cd0SMatthew Dillon 
8860c3a8cd0SMatthew Dillon 		/*
8870c3a8cd0SMatthew Dillon 		 * Must adjust bytes (and the state) when falling through.
8880c3a8cd0SMatthew Dillon 		 * nmax doesn't change.
8890c3a8cd0SMatthew Dillon 		 */
8900c3a8cd0SMatthew Dillon 		ioq->fifo_beg += ioq->hbytes;
8910c3a8cd0SMatthew Dillon 		bytes -= ioq->hbytes;
8920c3a8cd0SMatthew Dillon 		ioq->state = DMSG_MSGQ_STATE_AUXDATA1;
8930c3a8cd0SMatthew Dillon 		/* fall through */
8940c3a8cd0SMatthew Dillon 	case DMSG_MSGQ_STATE_AUXDATA1:
8950c3a8cd0SMatthew Dillon 		/*
896a2179323SMatthew Dillon 		 * Copy the partial or complete [decrypted] payload from
897a2179323SMatthew Dillon 		 * remaining bytes in the FIFO in order to optimize the
898a2179323SMatthew Dillon 		 * makeroom call in the AUXDATA2 state.  We have to
899a2179323SMatthew Dillon 		 * fall-through either way so we can check the crc.
9000c3a8cd0SMatthew Dillon 		 *
9010c3a8cd0SMatthew Dillon 		 * msg->aux_size tracks our aux data.
902a2179323SMatthew Dillon 		 *
903a2179323SMatthew Dillon 		 * (Lets not complicate matters if the data is encrypted,
904a2179323SMatthew Dillon 		 *  since the data in-stream is not the same size as the
905a2179323SMatthew Dillon 		 *  data decrypted).
9060c3a8cd0SMatthew Dillon 		 */
9070c3a8cd0SMatthew Dillon 		if (bytes >= ioq->abytes) {
9080c3a8cd0SMatthew Dillon 			bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
9090c3a8cd0SMatthew Dillon 			      ioq->abytes);
9100c3a8cd0SMatthew Dillon 			msg->aux_size = ioq->abytes;
9110c3a8cd0SMatthew Dillon 			ioq->fifo_beg += ioq->abytes;
9120c3a8cd0SMatthew Dillon 			assert(ioq->fifo_beg <= ioq->fifo_cdx);
9130c3a8cd0SMatthew Dillon 			assert(ioq->fifo_cdx <= ioq->fifo_cdn);
9140c3a8cd0SMatthew Dillon 			bytes -= ioq->abytes;
9150c3a8cd0SMatthew Dillon 		} else if (bytes) {
9160c3a8cd0SMatthew Dillon 			bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
9170c3a8cd0SMatthew Dillon 			      bytes);
9180c3a8cd0SMatthew Dillon 			msg->aux_size = bytes;
9190c3a8cd0SMatthew Dillon 			ioq->fifo_beg += bytes;
9200c3a8cd0SMatthew Dillon 			if (ioq->fifo_cdx < ioq->fifo_beg)
9210c3a8cd0SMatthew Dillon 				ioq->fifo_cdx = ioq->fifo_beg;
9220c3a8cd0SMatthew Dillon 			assert(ioq->fifo_beg <= ioq->fifo_cdx);
9230c3a8cd0SMatthew Dillon 			assert(ioq->fifo_cdx <= ioq->fifo_cdn);
9240c3a8cd0SMatthew Dillon 			bytes = 0;
9250c3a8cd0SMatthew Dillon 		} else {
9260c3a8cd0SMatthew Dillon 			msg->aux_size = 0;
9270c3a8cd0SMatthew Dillon 		}
9280c3a8cd0SMatthew Dillon 		ioq->state = DMSG_MSGQ_STATE_AUXDATA2;
9290c3a8cd0SMatthew Dillon 		/* fall through */
9300c3a8cd0SMatthew Dillon 	case DMSG_MSGQ_STATE_AUXDATA2:
9310c3a8cd0SMatthew Dillon 		/*
9320c3a8cd0SMatthew Dillon 		 * Make sure there is enough room for more data.
9330c3a8cd0SMatthew Dillon 		 */
9340c3a8cd0SMatthew Dillon 		assert(msg);
9350c3a8cd0SMatthew Dillon 		nmax = dmsg_ioq_makeroom(ioq, ioq->abytes - msg->aux_size);
9360c3a8cd0SMatthew Dillon 
9370c3a8cd0SMatthew Dillon 		/*
9380c3a8cd0SMatthew Dillon 		 * Read and decrypt more of the payload.
9390c3a8cd0SMatthew Dillon 		 */
9400c3a8cd0SMatthew Dillon 		if (msg->aux_size < ioq->abytes) {
9410c3a8cd0SMatthew Dillon 			assert(bytes == 0);
9420c3a8cd0SMatthew Dillon 			n = read(iocom->sock_fd,
9430c3a8cd0SMatthew Dillon 				 ioq->buf + ioq->fifo_end,
9440c3a8cd0SMatthew Dillon 				 nmax);
9450c3a8cd0SMatthew Dillon 			if (n <= 0) {
9460c3a8cd0SMatthew Dillon 				if (n == 0) {
9470c3a8cd0SMatthew Dillon 					ioq->error = DMSG_IOQ_ERROR_EOF;
9480c3a8cd0SMatthew Dillon 					break;
9490c3a8cd0SMatthew Dillon 				}
9500c3a8cd0SMatthew Dillon 				if (errno != EINTR &&
9510c3a8cd0SMatthew Dillon 				    errno != EINPROGRESS &&
9520c3a8cd0SMatthew Dillon 				    errno != EAGAIN) {
9530c3a8cd0SMatthew Dillon 					ioq->error = DMSG_IOQ_ERROR_SOCK;
9540c3a8cd0SMatthew Dillon 					break;
9550c3a8cd0SMatthew Dillon 				}
9560c3a8cd0SMatthew Dillon 				n = 0;
9570c3a8cd0SMatthew Dillon 				/* fall through */
9580c3a8cd0SMatthew Dillon 			}
9590c3a8cd0SMatthew Dillon 			ioq->fifo_end += (size_t)n;
9600c3a8cd0SMatthew Dillon 			nmax -= (size_t)n;
9610c3a8cd0SMatthew Dillon 		}
9620c3a8cd0SMatthew Dillon 
9630c3a8cd0SMatthew Dillon 		if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
9640c3a8cd0SMatthew Dillon 			dmsg_crypto_decrypt(iocom, ioq);
9650c3a8cd0SMatthew Dillon 		} else {
9660c3a8cd0SMatthew Dillon 			ioq->fifo_cdx = ioq->fifo_end;
9670c3a8cd0SMatthew Dillon 			ioq->fifo_cdn = ioq->fifo_end;
9680c3a8cd0SMatthew Dillon 		}
9690c3a8cd0SMatthew Dillon 		bytes = ioq->fifo_cdx - ioq->fifo_beg;
9700c3a8cd0SMatthew Dillon 
9710c3a8cd0SMatthew Dillon 		if (bytes > ioq->abytes - msg->aux_size)
9720c3a8cd0SMatthew Dillon 			bytes = ioq->abytes - msg->aux_size;
9730c3a8cd0SMatthew Dillon 
9740c3a8cd0SMatthew Dillon 		if (bytes) {
9750c3a8cd0SMatthew Dillon 			bcopy(ioq->buf + ioq->fifo_beg,
9760c3a8cd0SMatthew Dillon 			      msg->aux_data + msg->aux_size,
9770c3a8cd0SMatthew Dillon 			      bytes);
9780c3a8cd0SMatthew Dillon 			msg->aux_size += bytes;
9790c3a8cd0SMatthew Dillon 			ioq->fifo_beg += bytes;
9800c3a8cd0SMatthew Dillon 		}
9810c3a8cd0SMatthew Dillon 
9820c3a8cd0SMatthew Dillon 		/*
9830c3a8cd0SMatthew Dillon 		 * Insufficient data accumulated (set msg NULL so caller will
9840c3a8cd0SMatthew Dillon 		 * retry on event).
985f306de83SMatthew Dillon 		 *
986f306de83SMatthew Dillon 		 * Assert the auxillary data size is correct, then record the
987f306de83SMatthew Dillon 		 * original unaligned size from the message header.
9880c3a8cd0SMatthew Dillon 		 */
9890c3a8cd0SMatthew Dillon 		if (msg->aux_size < ioq->abytes) {
9900c3a8cd0SMatthew Dillon 			msg = NULL;
9910c3a8cd0SMatthew Dillon 			break;
9920c3a8cd0SMatthew Dillon 		}
9930c3a8cd0SMatthew Dillon 		assert(msg->aux_size == ioq->abytes);
994f306de83SMatthew Dillon 		msg->aux_size = ioq->unaligned_aux_size;
9950c3a8cd0SMatthew Dillon 
9960c3a8cd0SMatthew Dillon 		/*
997f306de83SMatthew Dillon 		 * Check aux_crc, then we are done.  Note that the crc
998f306de83SMatthew Dillon 		 * is calculated over the aligned size, not the actual
999f306de83SMatthew Dillon 		 * size.
10000c3a8cd0SMatthew Dillon 		 */
1001f306de83SMatthew Dillon 		xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes);
10020c3a8cd0SMatthew Dillon 		if (xcrc32 != msg->any.head.aux_crc) {
10030c3a8cd0SMatthew Dillon 			ioq->error = DMSG_IOQ_ERROR_ACRC;
1004d30cab67SMatthew Dillon 			fprintf(stderr,
1005d30cab67SMatthew Dillon 				"iocom: ACRC error %08x vs %08x "
1006d30cab67SMatthew Dillon 				"msgid %016jx msgcmd %08x auxsize %d\n",
1007d30cab67SMatthew Dillon 				xcrc32,
1008d30cab67SMatthew Dillon 				msg->any.head.aux_crc,
1009d30cab67SMatthew Dillon 				(intmax_t)msg->any.head.msgid,
1010d30cab67SMatthew Dillon 				msg->any.head.cmd,
1011d30cab67SMatthew Dillon 				msg->any.head.aux_bytes);
10120c3a8cd0SMatthew Dillon 			break;
10130c3a8cd0SMatthew Dillon 		}
10140c3a8cd0SMatthew Dillon 		break;
10150c3a8cd0SMatthew Dillon 	case DMSG_MSGQ_STATE_ERROR:
10160c3a8cd0SMatthew Dillon 		/*
10170c3a8cd0SMatthew Dillon 		 * Continued calls to drain recorded transactions (returning
10180c3a8cd0SMatthew Dillon 		 * a LNK_ERROR for each one), before we return the final
10190c3a8cd0SMatthew Dillon 		 * LNK_ERROR.
10200c3a8cd0SMatthew Dillon 		 */
10210c3a8cd0SMatthew Dillon 		assert(msg == NULL);
10220c3a8cd0SMatthew Dillon 		break;
10230c3a8cd0SMatthew Dillon 	default:
10240c3a8cd0SMatthew Dillon 		/*
10250c3a8cd0SMatthew Dillon 		 * We don't double-return errors, the caller should not
10260c3a8cd0SMatthew Dillon 		 * have called us again after getting an error msg.
10270c3a8cd0SMatthew Dillon 		 */
10280c3a8cd0SMatthew Dillon 		assert(0);
10290c3a8cd0SMatthew Dillon 		break;
10300c3a8cd0SMatthew Dillon 	}
10310c3a8cd0SMatthew Dillon 
10320c3a8cd0SMatthew Dillon 	/*
10330c3a8cd0SMatthew Dillon 	 * Check the message sequence.  The iv[] should prevent any
10340c3a8cd0SMatthew Dillon 	 * possibility of a replay but we add this check anyway.
10350c3a8cd0SMatthew Dillon 	 */
10360c3a8cd0SMatthew Dillon 	if (msg && ioq->error == 0) {
10370c3a8cd0SMatthew Dillon 		if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
10380c3a8cd0SMatthew Dillon 			ioq->error = DMSG_IOQ_ERROR_MSGSEQ;
10390c3a8cd0SMatthew Dillon 		} else {
10400c3a8cd0SMatthew Dillon 			++ioq->seq;
10410c3a8cd0SMatthew Dillon 		}
10420c3a8cd0SMatthew Dillon 	}
10430c3a8cd0SMatthew Dillon 
10440c3a8cd0SMatthew Dillon 	/*
10450c3a8cd0SMatthew Dillon 	 * Handle error, RREQ, or completion
10460c3a8cd0SMatthew Dillon 	 *
10470c3a8cd0SMatthew Dillon 	 * NOTE: nmax and bytes are invalid at this point, we don't bother
10480c3a8cd0SMatthew Dillon 	 *	 to update them when breaking out.
10490c3a8cd0SMatthew Dillon 	 */
10500c3a8cd0SMatthew Dillon 	if (ioq->error) {
1051323c0947SMatthew Dillon 		dmsg_state_t *tmp_state;
10520c3a8cd0SMatthew Dillon skip:
1053a2179323SMatthew Dillon 		fprintf(stderr, "IOQ ERROR %d\n", ioq->error);
10540c3a8cd0SMatthew Dillon 		/*
10550c3a8cd0SMatthew Dillon 		 * An unrecoverable error causes all active receive
10560c3a8cd0SMatthew Dillon 		 * transactions to be terminated with a LNK_ERROR message.
10570c3a8cd0SMatthew Dillon 		 *
10580c3a8cd0SMatthew Dillon 		 * Once all active transactions are exhausted we set the
10590c3a8cd0SMatthew Dillon 		 * iocom ERROR flag and return a non-transactional LNK_ERROR
10600c3a8cd0SMatthew Dillon 		 * message, which should cause master processing loops to
10610c3a8cd0SMatthew Dillon 		 * terminate.
10620c3a8cd0SMatthew Dillon 		 */
10630c3a8cd0SMatthew Dillon 		assert(ioq->msg == msg);
10640c3a8cd0SMatthew Dillon 		if (msg) {
10650c3a8cd0SMatthew Dillon 			dmsg_msg_free(msg);
10660c3a8cd0SMatthew Dillon 			ioq->msg = NULL;
1067323c0947SMatthew Dillon 			msg = NULL;
10680c3a8cd0SMatthew Dillon 		}
10690c3a8cd0SMatthew Dillon 
10700c3a8cd0SMatthew Dillon 		/*
10710c3a8cd0SMatthew Dillon 		 * No more I/O read processing
10720c3a8cd0SMatthew Dillon 		 */
10730c3a8cd0SMatthew Dillon 		ioq->state = DMSG_MSGQ_STATE_ERROR;
10740c3a8cd0SMatthew Dillon 
10750c3a8cd0SMatthew Dillon 		/*
10760c3a8cd0SMatthew Dillon 		 * Simulate a remote LNK_ERROR DELETE msg for any open
10770c3a8cd0SMatthew Dillon 		 * transactions, ending with a final non-transactional
10780c3a8cd0SMatthew Dillon 		 * LNK_ERROR (that the session can detect) when no
10790c3a8cd0SMatthew Dillon 		 * transactions remain.
10800d20ec8aSMatthew Dillon 		 *
10811b8eded1SMatthew Dillon 		 * NOTE: Temporarily supply state0 and a degenerate cmd
10821b8eded1SMatthew Dillon 		 *	 without CREATE set.  The real state will be
10831b8eded1SMatthew Dillon 		 *	 assigned in the loop.
10841b8eded1SMatthew Dillon 		 *
10851b8eded1SMatthew Dillon 		 * NOTE: We are simulating a received message using our
10861b8eded1SMatthew Dillon 		 *	 side of the state, so the DMSGF_REV* bits have
10871b8eded1SMatthew Dillon 		 *	 to be reversed.
10880c3a8cd0SMatthew Dillon 		 */
10890c3a8cd0SMatthew Dillon 		pthread_mutex_lock(&iocom->mtx);
10900c3a8cd0SMatthew Dillon 		dmsg_iocom_drain(iocom);
10910d20ec8aSMatthew Dillon 
1092323c0947SMatthew Dillon 		tmp_state = NULL;
1093323c0947SMatthew Dillon 		RB_FOREACH(state, dmsg_state_tree, &iocom->staterd_tree) {
1094323c0947SMatthew Dillon 			atomic_set_int(&state->flags, DMSG_STATE_DYING);
1095323c0947SMatthew Dillon 			if (tmp_state == NULL && TAILQ_EMPTY(&state->subq))
1096323c0947SMatthew Dillon 				tmp_state = state;
1097323c0947SMatthew Dillon 		}
1098323c0947SMatthew Dillon 		RB_FOREACH(state, dmsg_state_tree, &iocom->statewr_tree) {
1099323c0947SMatthew Dillon 			atomic_set_int(&state->flags, DMSG_STATE_DYING);
1100323c0947SMatthew Dillon 			if (tmp_state == NULL && TAILQ_EMPTY(&state->subq))
1101323c0947SMatthew Dillon 				tmp_state = state;
1102323c0947SMatthew Dillon 		}
1103323c0947SMatthew Dillon 
1104323c0947SMatthew Dillon 		if (tmp_state) {
1105323c0947SMatthew Dillon 			dmsg_msg_simulate_failure(tmp_state, ioq->error);
11060c3a8cd0SMatthew Dillon 		} else {
1107323c0947SMatthew Dillon 			dmsg_msg_simulate_failure(&iocom->state0, ioq->error);
11080c3a8cd0SMatthew Dillon 		}
11090c3a8cd0SMatthew Dillon 		pthread_mutex_unlock(&iocom->mtx);
1110323c0947SMatthew Dillon 		if (TAILQ_FIRST(&ioq->msgq))
1111323c0947SMatthew Dillon 			goto again;
11120c3a8cd0SMatthew Dillon 
1113323c0947SMatthew Dillon #if 0
11140c3a8cd0SMatthew Dillon 		/*
11150c3a8cd0SMatthew Dillon 		 * For the iocom error case we want to set RWORK to indicate
11160c3a8cd0SMatthew Dillon 		 * that more messages might be pending.
11170c3a8cd0SMatthew Dillon 		 *
11180c3a8cd0SMatthew Dillon 		 * It is possible to return NULL when there is more work to
11190c3a8cd0SMatthew Dillon 		 * do because each message has to be DELETEd in both
11200c3a8cd0SMatthew Dillon 		 * directions before we continue on with the next (though
11210c3a8cd0SMatthew Dillon 		 * this could be optimized).  The transmit direction will
11220c3a8cd0SMatthew Dillon 		 * re-set RWORK.
11230c3a8cd0SMatthew Dillon 		 */
11240c3a8cd0SMatthew Dillon 		if (msg)
1125a2179323SMatthew Dillon 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1126323c0947SMatthew Dillon #endif
11270c3a8cd0SMatthew Dillon 	} else if (msg == NULL) {
11280c3a8cd0SMatthew Dillon 		/*
11290c3a8cd0SMatthew Dillon 		 * Insufficient data received to finish building the message,
11300c3a8cd0SMatthew Dillon 		 * set RREQ and return NULL.
11310c3a8cd0SMatthew Dillon 		 *
11320c3a8cd0SMatthew Dillon 		 * Leave ioq->msg intact.
11330c3a8cd0SMatthew Dillon 		 * Leave the FIFO intact.
11340c3a8cd0SMatthew Dillon 		 */
1135a2179323SMatthew Dillon 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
11360c3a8cd0SMatthew Dillon 	} else {
11370c3a8cd0SMatthew Dillon 		/*
11380d20ec8aSMatthew Dillon 		 * Continue processing msg.
11390c3a8cd0SMatthew Dillon 		 *
11400c3a8cd0SMatthew Dillon 		 * The fifo has already been advanced past the message.
11410c3a8cd0SMatthew Dillon 		 * Trivially reset the FIFO indices if possible.
11420c3a8cd0SMatthew Dillon 		 *
11430c3a8cd0SMatthew Dillon 		 * clear the FIFO if it is now empty and set RREQ to wait
11440c3a8cd0SMatthew Dillon 		 * for more from the socket.  If the FIFO is not empty set
11450c3a8cd0SMatthew Dillon 		 * TWORK to bypass the poll so we loop immediately.
11460c3a8cd0SMatthew Dillon 		 */
11470c3a8cd0SMatthew Dillon 		if (ioq->fifo_beg == ioq->fifo_cdx &&
11480c3a8cd0SMatthew Dillon 		    ioq->fifo_cdn == ioq->fifo_end) {
1149a2179323SMatthew Dillon 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
11500c3a8cd0SMatthew Dillon 			ioq->fifo_cdx = 0;
11510c3a8cd0SMatthew Dillon 			ioq->fifo_cdn = 0;
11520c3a8cd0SMatthew Dillon 			ioq->fifo_beg = 0;
11530c3a8cd0SMatthew Dillon 			ioq->fifo_end = 0;
11540c3a8cd0SMatthew Dillon 		} else {
1155a2179323SMatthew Dillon 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
11560c3a8cd0SMatthew Dillon 		}
11570c3a8cd0SMatthew Dillon 		ioq->state = DMSG_MSGQ_STATE_HEADER1;
11580c3a8cd0SMatthew Dillon 		ioq->msg = NULL;
11590d20ec8aSMatthew Dillon 
11600d20ec8aSMatthew Dillon 		/*
11610d20ec8aSMatthew Dillon 		 * Handle message routing.  Validates non-zero sources
11620d20ec8aSMatthew Dillon 		 * and routes message.  Error will be 0 if the message is
11630d20ec8aSMatthew Dillon 		 * destined for us.
11640d20ec8aSMatthew Dillon 		 *
11650d20ec8aSMatthew Dillon 		 * State processing only occurs for messages destined for us.
11660d20ec8aSMatthew Dillon 		 */
1167a2179323SMatthew Dillon 		if (DMsgDebugOpt >= 5) {
1168a2179323SMatthew Dillon 			fprintf(stderr,
1169a2179323SMatthew Dillon 				"rxmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1170a2179323SMatthew Dillon 				msg->any.head.cmd,
1171a2179323SMatthew Dillon 				(intmax_t)msg->any.head.msgid,
1172a2179323SMatthew Dillon 				(intmax_t)msg->any.head.circuit);
1173a2179323SMatthew Dillon 		}
11740d20ec8aSMatthew Dillon 		error = dmsg_state_msgrx(msg);
11750d20ec8aSMatthew Dillon 
11760d20ec8aSMatthew Dillon 		if (error) {
11770d20ec8aSMatthew Dillon 			/*
11780d20ec8aSMatthew Dillon 			 * Abort-after-closure, throw message away and
11790d20ec8aSMatthew Dillon 			 * start reading another.
11800d20ec8aSMatthew Dillon 			 */
11810d20ec8aSMatthew Dillon 			if (error == DMSG_IOQ_ERROR_EALREADY) {
11820d20ec8aSMatthew Dillon 				dmsg_msg_free(msg);
11830d20ec8aSMatthew Dillon 				goto again;
11840d20ec8aSMatthew Dillon 			}
11850d20ec8aSMatthew Dillon 
11860d20ec8aSMatthew Dillon 			/*
11870d20ec8aSMatthew Dillon 			 * Process real error and throw away message.
11880d20ec8aSMatthew Dillon 			 */
11890d20ec8aSMatthew Dillon 			ioq->error = error;
11900d20ec8aSMatthew Dillon 			goto skip;
11910d20ec8aSMatthew Dillon 		}
11920d20ec8aSMatthew Dillon 		/* no error, not routed.  Fall through and return msg */
11930c3a8cd0SMatthew Dillon 	}
11940c3a8cd0SMatthew Dillon 	return (msg);
11950c3a8cd0SMatthew Dillon }
11960c3a8cd0SMatthew Dillon 
11970c3a8cd0SMatthew Dillon /*
11980c3a8cd0SMatthew Dillon  * Calculate the header and data crc's and write a low-level message to
11990c3a8cd0SMatthew Dillon  * the connection.  If aux_crc is non-zero the aux_data crc is already
12000c3a8cd0SMatthew Dillon  * assumed to have been set.
12010c3a8cd0SMatthew Dillon  *
12020c3a8cd0SMatthew Dillon  * A non-NULL msg is added to the queue but not necessarily flushed.
12030c3a8cd0SMatthew Dillon  * Calling this function with msg == NULL will get a flush going.
12040c3a8cd0SMatthew Dillon  *
1205a2179323SMatthew Dillon  * (called from iocom_core only)
12060c3a8cd0SMatthew Dillon  */
12070c3a8cd0SMatthew Dillon void
12080c3a8cd0SMatthew Dillon dmsg_iocom_flush1(dmsg_iocom_t *iocom)
12090c3a8cd0SMatthew Dillon {
12100c3a8cd0SMatthew Dillon 	dmsg_ioq_t *ioq = &iocom->ioq_tx;
12110c3a8cd0SMatthew Dillon 	dmsg_msg_t *msg;
12120c3a8cd0SMatthew Dillon 	uint32_t xcrc32;
1213f306de83SMatthew Dillon 	size_t hbytes;
1214f306de83SMatthew Dillon 	size_t abytes;
12150c3a8cd0SMatthew Dillon 	dmsg_msg_queue_t tmpq;
12160c3a8cd0SMatthew Dillon 
1217a2179323SMatthew Dillon 	atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
12180c3a8cd0SMatthew Dillon 	TAILQ_INIT(&tmpq);
12190c3a8cd0SMatthew Dillon 	pthread_mutex_lock(&iocom->mtx);
12200d20ec8aSMatthew Dillon 	while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
12210d20ec8aSMatthew Dillon 		TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
12220c3a8cd0SMatthew Dillon 		TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
12230c3a8cd0SMatthew Dillon 	}
12240c3a8cd0SMatthew Dillon 	pthread_mutex_unlock(&iocom->mtx);
12250c3a8cd0SMatthew Dillon 
12260c3a8cd0SMatthew Dillon 	while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
12270c3a8cd0SMatthew Dillon 		/*
12280c3a8cd0SMatthew Dillon 		 * Process terminal connection errors.
12290c3a8cd0SMatthew Dillon 		 */
12300c3a8cd0SMatthew Dillon 		TAILQ_REMOVE(&tmpq, msg, qentry);
12310c3a8cd0SMatthew Dillon 		if (ioq->error) {
12320c3a8cd0SMatthew Dillon 			TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
12330c3a8cd0SMatthew Dillon 			++ioq->msgcount;
12340c3a8cd0SMatthew Dillon 			continue;
12350c3a8cd0SMatthew Dillon 		}
12360c3a8cd0SMatthew Dillon 
12370c3a8cd0SMatthew Dillon 		/*
12380c3a8cd0SMatthew Dillon 		 * Finish populating the msg fields.  The salt ensures that
12390c3a8cd0SMatthew Dillon 		 * the iv[] array is ridiculously randomized and we also
12400c3a8cd0SMatthew Dillon 		 * re-seed our PRNG every 32768 messages just to be sure.
12410c3a8cd0SMatthew Dillon 		 */
12420c3a8cd0SMatthew Dillon 		msg->any.head.magic = DMSG_HDR_MAGIC;
12430c3a8cd0SMatthew Dillon 		msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
12440c3a8cd0SMatthew Dillon 		++ioq->seq;
12450c3a8cd0SMatthew Dillon 		if ((ioq->seq & 32767) == 0)
12460c3a8cd0SMatthew Dillon 			srandomdev();
12470c3a8cd0SMatthew Dillon 
12480c3a8cd0SMatthew Dillon 		/*
12490c3a8cd0SMatthew Dillon 		 * Calculate aux_crc if 0, then calculate hdr_crc.
12500c3a8cd0SMatthew Dillon 		 */
12510c3a8cd0SMatthew Dillon 		if (msg->aux_size && msg->any.head.aux_crc == 0) {
1252f306de83SMatthew Dillon 			abytes = DMSG_DOALIGN(msg->aux_size);
1253f306de83SMatthew Dillon 			xcrc32 = dmsg_icrc32(msg->aux_data, abytes);
12540c3a8cd0SMatthew Dillon 			msg->any.head.aux_crc = xcrc32;
12550c3a8cd0SMatthew Dillon 		}
1256f306de83SMatthew Dillon 		msg->any.head.aux_bytes = msg->aux_size;
12570c3a8cd0SMatthew Dillon 
12580c3a8cd0SMatthew Dillon 		hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
12590c3a8cd0SMatthew Dillon 			 DMSG_ALIGN;
12600c3a8cd0SMatthew Dillon 		msg->any.head.hdr_crc = 0;
12610c3a8cd0SMatthew Dillon 		msg->any.head.hdr_crc = dmsg_icrc32(&msg->any.head, hbytes);
12620c3a8cd0SMatthew Dillon 
12630c3a8cd0SMatthew Dillon 		/*
12640c3a8cd0SMatthew Dillon 		 * Enqueue the message (the flush codes handles stream
12650c3a8cd0SMatthew Dillon 		 * encryption).
12660c3a8cd0SMatthew Dillon 		 */
12670c3a8cd0SMatthew Dillon 		TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
12680c3a8cd0SMatthew Dillon 		++ioq->msgcount;
12690c3a8cd0SMatthew Dillon 	}
12700c3a8cd0SMatthew Dillon 	dmsg_iocom_flush2(iocom);
12710c3a8cd0SMatthew Dillon }
12720c3a8cd0SMatthew Dillon 
12730c3a8cd0SMatthew Dillon /*
12740c3a8cd0SMatthew Dillon  * Thread localized, iocom->mtx not held by caller.
1275a2179323SMatthew Dillon  *
1276a2179323SMatthew Dillon  * (called from iocom_core via iocom_flush1 only)
12770c3a8cd0SMatthew Dillon  */
12780c3a8cd0SMatthew Dillon void
12790c3a8cd0SMatthew Dillon dmsg_iocom_flush2(dmsg_iocom_t *iocom)
12800c3a8cd0SMatthew Dillon {
12810c3a8cd0SMatthew Dillon 	dmsg_ioq_t *ioq = &iocom->ioq_tx;
12820c3a8cd0SMatthew Dillon 	dmsg_msg_t *msg;
12830c3a8cd0SMatthew Dillon 	ssize_t n;
12840c3a8cd0SMatthew Dillon 	struct iovec iov[DMSG_IOQ_MAXIOVEC];
12850c3a8cd0SMatthew Dillon 	size_t nact;
12860c3a8cd0SMatthew Dillon 	size_t hbytes;
12870c3a8cd0SMatthew Dillon 	size_t abytes;
12880c3a8cd0SMatthew Dillon 	size_t hoff;
12890c3a8cd0SMatthew Dillon 	size_t aoff;
12900c3a8cd0SMatthew Dillon 	int iovcnt;
12910c3a8cd0SMatthew Dillon 
12920c3a8cd0SMatthew Dillon 	if (ioq->error) {
12930c3a8cd0SMatthew Dillon 		dmsg_iocom_drain(iocom);
12940c3a8cd0SMatthew Dillon 		return;
12950c3a8cd0SMatthew Dillon 	}
12960c3a8cd0SMatthew Dillon 
12970c3a8cd0SMatthew Dillon 	/*
12980c3a8cd0SMatthew Dillon 	 * Pump messages out the connection by building an iovec.
12990c3a8cd0SMatthew Dillon 	 *
13000c3a8cd0SMatthew Dillon 	 * ioq->hbytes/ioq->abytes tracks how much of the first message
13010c3a8cd0SMatthew Dillon 	 * in the queue has been successfully written out, so we can
13020c3a8cd0SMatthew Dillon 	 * resume writing.
13030c3a8cd0SMatthew Dillon 	 */
13040c3a8cd0SMatthew Dillon 	iovcnt = 0;
13050c3a8cd0SMatthew Dillon 	nact = 0;
13060c3a8cd0SMatthew Dillon 	hoff = ioq->hbytes;
13070c3a8cd0SMatthew Dillon 	aoff = ioq->abytes;
13080c3a8cd0SMatthew Dillon 
13090c3a8cd0SMatthew Dillon 	TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
13100c3a8cd0SMatthew Dillon 		hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
13110c3a8cd0SMatthew Dillon 			 DMSG_ALIGN;
13128d6d37b8SMatthew Dillon 		abytes = DMSG_DOALIGN(msg->aux_size);
13130c3a8cd0SMatthew Dillon 		assert(hoff <= hbytes && aoff <= abytes);
13140c3a8cd0SMatthew Dillon 
13150c3a8cd0SMatthew Dillon 		if (hoff < hbytes) {
1316*024de405SMatthew Dillon 			size_t maxlen = hbytes - hoff;
1317*024de405SMatthew Dillon 			if (maxlen > sizeof(ioq->buf) / 2)
1318*024de405SMatthew Dillon 				maxlen = sizeof(ioq->buf) / 2;
13190c3a8cd0SMatthew Dillon 			iov[iovcnt].iov_base = (char *)&msg->any.head + hoff;
1320*024de405SMatthew Dillon 			iov[iovcnt].iov_len = maxlen;
1321*024de405SMatthew Dillon 			nact += maxlen;
13220c3a8cd0SMatthew Dillon 			++iovcnt;
1323*024de405SMatthew Dillon 			if (iovcnt == DMSG_IOQ_MAXIOVEC ||
1324*024de405SMatthew Dillon 			    maxlen != hbytes - hoff) {
13250c3a8cd0SMatthew Dillon 				break;
13260c3a8cd0SMatthew Dillon 			}
1327*024de405SMatthew Dillon 		}
13280c3a8cd0SMatthew Dillon 		if (aoff < abytes) {
1329*024de405SMatthew Dillon 			size_t maxlen = abytes - aoff;
1330*024de405SMatthew Dillon 			if (maxlen > sizeof(ioq->buf) / 2)
1331*024de405SMatthew Dillon 				maxlen = sizeof(ioq->buf) / 2;
1332*024de405SMatthew Dillon 
13330c3a8cd0SMatthew Dillon 			assert(msg->aux_data != NULL);
13340c3a8cd0SMatthew Dillon 			iov[iovcnt].iov_base = (char *)msg->aux_data + aoff;
1335*024de405SMatthew Dillon 			iov[iovcnt].iov_len = maxlen;
1336*024de405SMatthew Dillon 			nact += maxlen;
13370c3a8cd0SMatthew Dillon 			++iovcnt;
1338*024de405SMatthew Dillon 			if (iovcnt == DMSG_IOQ_MAXIOVEC ||
1339*024de405SMatthew Dillon 			    maxlen != abytes - aoff) {
13400c3a8cd0SMatthew Dillon 				break;
13410c3a8cd0SMatthew Dillon 			}
1342*024de405SMatthew Dillon 		}
13430c3a8cd0SMatthew Dillon 		hoff = 0;
13440c3a8cd0SMatthew Dillon 		aoff = 0;
13450c3a8cd0SMatthew Dillon 	}
13460c3a8cd0SMatthew Dillon 	if (iovcnt == 0)
13470c3a8cd0SMatthew Dillon 		return;
13480c3a8cd0SMatthew Dillon 
13490c3a8cd0SMatthew Dillon 	/*
13500c3a8cd0SMatthew Dillon 	 * Encrypt and write the data.  The crypto code will move the
13510c3a8cd0SMatthew Dillon 	 * data into the fifo and adjust the iov as necessary.  If
13520c3a8cd0SMatthew Dillon 	 * encryption is disabled the iov is left alone.
13530c3a8cd0SMatthew Dillon 	 *
13540c3a8cd0SMatthew Dillon 	 * May return a smaller iov (thus a smaller n), with aggregated
13550c3a8cd0SMatthew Dillon 	 * chunks.  May reduce nmax to what fits in the FIFO.
13560c3a8cd0SMatthew Dillon 	 *
13570c3a8cd0SMatthew Dillon 	 * This function sets nact to the number of original bytes now
13580c3a8cd0SMatthew Dillon 	 * encrypted, adding to the FIFO some number of bytes that might
13590c3a8cd0SMatthew Dillon 	 * be greater depending on the crypto mechanic.  iov[] is adjusted
13600c3a8cd0SMatthew Dillon 	 * to point at the FIFO if necessary.
13610c3a8cd0SMatthew Dillon 	 *
13620c3a8cd0SMatthew Dillon 	 * NOTE: The return value from the writev() is the post-encrypted
13630c3a8cd0SMatthew Dillon 	 *	 byte count, not the plaintext count.
13640c3a8cd0SMatthew Dillon 	 */
13650c3a8cd0SMatthew Dillon 	if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
13660c3a8cd0SMatthew Dillon 		/*
13670c3a8cd0SMatthew Dillon 		 * Make sure the FIFO has a reasonable amount of space
13680c3a8cd0SMatthew Dillon 		 * left (if not completely full).
1369a2179323SMatthew Dillon 		 *
1370a2179323SMatthew Dillon 		 * In this situation we are staging the encrypted message
1371a2179323SMatthew Dillon 		 * data in the FIFO.  (nact) represents how much plaintext
1372a2179323SMatthew Dillon 		 * has been staged, (n) represents how much encrypted data
1373a2179323SMatthew Dillon 		 * has been flushed.  The two are independent of each other.
13740c3a8cd0SMatthew Dillon 		 */
13750c3a8cd0SMatthew Dillon 		if (ioq->fifo_beg > sizeof(ioq->buf) / 2 &&
1376a2179323SMatthew Dillon 		    sizeof(ioq->buf) - ioq->fifo_end < DMSG_ALIGN * 2) {
13770c3a8cd0SMatthew Dillon 			bcopy(ioq->buf + ioq->fifo_beg, ioq->buf,
13780c3a8cd0SMatthew Dillon 			      ioq->fifo_end - ioq->fifo_beg);
13790c3a8cd0SMatthew Dillon 			ioq->fifo_cdx -= ioq->fifo_beg;
13800c3a8cd0SMatthew Dillon 			ioq->fifo_cdn -= ioq->fifo_beg;
13810c3a8cd0SMatthew Dillon 			ioq->fifo_end -= ioq->fifo_beg;
13820c3a8cd0SMatthew Dillon 			ioq->fifo_beg = 0;
13830c3a8cd0SMatthew Dillon 		}
13840c3a8cd0SMatthew Dillon 
13850c3a8cd0SMatthew Dillon 		iovcnt = dmsg_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact);
13860c3a8cd0SMatthew Dillon 		n = writev(iocom->sock_fd, iov, iovcnt);
13870c3a8cd0SMatthew Dillon 		if (n > 0) {
13880c3a8cd0SMatthew Dillon 			ioq->fifo_beg += n;
13890c3a8cd0SMatthew Dillon 			ioq->fifo_cdn += n;
13900c3a8cd0SMatthew Dillon 			ioq->fifo_cdx += n;
13910c3a8cd0SMatthew Dillon 			if (ioq->fifo_beg == ioq->fifo_end) {
13920c3a8cd0SMatthew Dillon 				ioq->fifo_beg = 0;
13930c3a8cd0SMatthew Dillon 				ioq->fifo_cdn = 0;
13940c3a8cd0SMatthew Dillon 				ioq->fifo_cdx = 0;
13950c3a8cd0SMatthew Dillon 				ioq->fifo_end = 0;
13960c3a8cd0SMatthew Dillon 			}
13970c3a8cd0SMatthew Dillon 		}
1398a2179323SMatthew Dillon 		/*
1399a2179323SMatthew Dillon 		 * We don't mess with the nact returned by the crypto_encrypt
1400a2179323SMatthew Dillon 		 * call, which represents the filling of the FIFO.  (n) tells
1401a2179323SMatthew Dillon 		 * us how much we were able to write from the FIFO.  The two
1402a2179323SMatthew Dillon 		 * are different beasts when encrypting.
1403a2179323SMatthew Dillon 		 */
14040c3a8cd0SMatthew Dillon 	} else {
1405a2179323SMatthew Dillon 		/*
1406a2179323SMatthew Dillon 		 * In this situation we are not staging the messages to the
1407a2179323SMatthew Dillon 		 * FIFO but instead writing them directly from the msg
1408a2179323SMatthew Dillon 		 * structure(s), so (nact) is basically (n).
1409a2179323SMatthew Dillon 		 */
14100c3a8cd0SMatthew Dillon 		n = writev(iocom->sock_fd, iov, iovcnt);
14110c3a8cd0SMatthew Dillon 		if (n > 0)
14120c3a8cd0SMatthew Dillon 			nact = n;
14130c3a8cd0SMatthew Dillon 		else
14140c3a8cd0SMatthew Dillon 			nact = 0;
14150c3a8cd0SMatthew Dillon 	}
14160c3a8cd0SMatthew Dillon 
14170c3a8cd0SMatthew Dillon 	/*
14180c3a8cd0SMatthew Dillon 	 * Clean out the transmit queue based on what we successfully
14190c3a8cd0SMatthew Dillon 	 * sent (nact is the plaintext count).  ioq->hbytes/abytes
14200c3a8cd0SMatthew Dillon 	 * represents the portion of the first message previously sent.
14210c3a8cd0SMatthew Dillon 	 */
14220c3a8cd0SMatthew Dillon 	while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
14230c3a8cd0SMatthew Dillon 		hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
14240c3a8cd0SMatthew Dillon 			 DMSG_ALIGN;
14258d6d37b8SMatthew Dillon 		abytes = DMSG_DOALIGN(msg->aux_size);
14260c3a8cd0SMatthew Dillon 
14270c3a8cd0SMatthew Dillon 		if ((size_t)nact < hbytes - ioq->hbytes) {
14280c3a8cd0SMatthew Dillon 			ioq->hbytes += nact;
14290c3a8cd0SMatthew Dillon 			nact = 0;
14300c3a8cd0SMatthew Dillon 			break;
14310c3a8cd0SMatthew Dillon 		}
14320c3a8cd0SMatthew Dillon 		nact -= hbytes - ioq->hbytes;
14330c3a8cd0SMatthew Dillon 		ioq->hbytes = hbytes;
14340c3a8cd0SMatthew Dillon 		if ((size_t)nact < abytes - ioq->abytes) {
14350c3a8cd0SMatthew Dillon 			ioq->abytes += nact;
14360c3a8cd0SMatthew Dillon 			nact = 0;
14370c3a8cd0SMatthew Dillon 			break;
14380c3a8cd0SMatthew Dillon 		}
14390c3a8cd0SMatthew Dillon 		nact -= abytes - ioq->abytes;
1440a2179323SMatthew Dillon 		/* ioq->abytes = abytes; optimized out */
1441a2179323SMatthew Dillon 
14421b8eded1SMatthew Dillon #if 0
1443a2179323SMatthew Dillon 		fprintf(stderr,
1444a2179323SMatthew Dillon 			"txmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1445a2179323SMatthew Dillon 			msg->any.head.cmd,
1446a2179323SMatthew Dillon 			(intmax_t)msg->any.head.msgid,
1447a2179323SMatthew Dillon 			(intmax_t)msg->any.head.circuit);
14481b8eded1SMatthew Dillon #endif
14490c3a8cd0SMatthew Dillon 
14500c3a8cd0SMatthew Dillon 		TAILQ_REMOVE(&ioq->msgq, msg, qentry);
14510c3a8cd0SMatthew Dillon 		--ioq->msgcount;
14520c3a8cd0SMatthew Dillon 		ioq->hbytes = 0;
14530c3a8cd0SMatthew Dillon 		ioq->abytes = 0;
1454323c0947SMatthew Dillon 		dmsg_msg_free(msg);
14550c3a8cd0SMatthew Dillon 	}
14560c3a8cd0SMatthew Dillon 	assert(nact == 0);
14570c3a8cd0SMatthew Dillon 
14580c3a8cd0SMatthew Dillon 	/*
14590c3a8cd0SMatthew Dillon 	 * Process the return value from the write w/regards to blocking.
14600c3a8cd0SMatthew Dillon 	 */
14610c3a8cd0SMatthew Dillon 	if (n < 0) {
14620c3a8cd0SMatthew Dillon 		if (errno != EINTR &&
14630c3a8cd0SMatthew Dillon 		    errno != EINPROGRESS &&
14640c3a8cd0SMatthew Dillon 		    errno != EAGAIN) {
14650c3a8cd0SMatthew Dillon 			/*
14660c3a8cd0SMatthew Dillon 			 * Fatal write error
14670c3a8cd0SMatthew Dillon 			 */
14680c3a8cd0SMatthew Dillon 			ioq->error = DMSG_IOQ_ERROR_SOCK;
14690c3a8cd0SMatthew Dillon 			dmsg_iocom_drain(iocom);
14700c3a8cd0SMatthew Dillon 		} else {
14710c3a8cd0SMatthew Dillon 			/*
14720c3a8cd0SMatthew Dillon 			 * Wait for socket buffer space
14730c3a8cd0SMatthew Dillon 			 */
1474a2179323SMatthew Dillon 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
14750c3a8cd0SMatthew Dillon 		}
14760c3a8cd0SMatthew Dillon 	} else {
1477a2179323SMatthew Dillon 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
14780c3a8cd0SMatthew Dillon 	}
14790c3a8cd0SMatthew Dillon 	if (ioq->error) {
14800c3a8cd0SMatthew Dillon 		dmsg_iocom_drain(iocom);
14810c3a8cd0SMatthew Dillon 	}
14820c3a8cd0SMatthew Dillon }
14830c3a8cd0SMatthew Dillon 
14840c3a8cd0SMatthew Dillon /*
14850c3a8cd0SMatthew Dillon  * Kill pending msgs on ioq_tx and adjust the flags such that no more
14860c3a8cd0SMatthew Dillon  * write events will occur.  We don't kill read msgs because we want
14870c3a8cd0SMatthew Dillon  * the caller to pull off our contrived terminal error msg to detect
14880c3a8cd0SMatthew Dillon  * the connection failure.
14890c3a8cd0SMatthew Dillon  *
1490a2179323SMatthew Dillon  * Localized to iocom_core thread, iocom->mtx not held by caller.
14910c3a8cd0SMatthew Dillon  */
14920c3a8cd0SMatthew Dillon void
14930c3a8cd0SMatthew Dillon dmsg_iocom_drain(dmsg_iocom_t *iocom)
14940c3a8cd0SMatthew Dillon {
14950c3a8cd0SMatthew Dillon 	dmsg_ioq_t *ioq = &iocom->ioq_tx;
14960c3a8cd0SMatthew Dillon 	dmsg_msg_t *msg;
14970c3a8cd0SMatthew Dillon 
1498a2179323SMatthew Dillon 	atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
14990c3a8cd0SMatthew Dillon 	ioq->hbytes = 0;
15000c3a8cd0SMatthew Dillon 	ioq->abytes = 0;
15010c3a8cd0SMatthew Dillon 
15020c3a8cd0SMatthew Dillon 	while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
15030c3a8cd0SMatthew Dillon 		TAILQ_REMOVE(&ioq->msgq, msg, qentry);
15040c3a8cd0SMatthew Dillon 		--ioq->msgcount;
1505323c0947SMatthew Dillon 		dmsg_msg_free(msg);
15060c3a8cd0SMatthew Dillon 	}
15070c3a8cd0SMatthew Dillon }
15080c3a8cd0SMatthew Dillon 
15090c3a8cd0SMatthew Dillon /*
15100c3a8cd0SMatthew Dillon  * Write a message to an iocom, with additional state processing.
15110c3a8cd0SMatthew Dillon  */
15120c3a8cd0SMatthew Dillon void
15130c3a8cd0SMatthew Dillon dmsg_msg_write(dmsg_msg_t *msg)
15140c3a8cd0SMatthew Dillon {
15151b8eded1SMatthew Dillon 	dmsg_iocom_t *iocom = msg->state->iocom;
15160c3a8cd0SMatthew Dillon 	dmsg_state_t *state;
15170c3a8cd0SMatthew Dillon 	char dummy;
15180c3a8cd0SMatthew Dillon 
15190c3a8cd0SMatthew Dillon 	pthread_mutex_lock(&iocom->mtx);
15201b8eded1SMatthew Dillon 	state = msg->state;
1521d30cab67SMatthew Dillon 
1522323c0947SMatthew Dillon 	/*
1523323c0947SMatthew Dillon 	 * Make sure the parent transaction is still open in the transmit
1524323c0947SMatthew Dillon 	 * direction.  If it isn't the message is dead and we have to
1525323c0947SMatthew Dillon 	 * potentially simulate a rxmsg terminating the transaction.
1526323c0947SMatthew Dillon 	 */
1527323c0947SMatthew Dillon 	if (state->parent->txcmd & DMSGF_DELETE) {
1528323c0947SMatthew Dillon 		fprintf(stderr, "dmsg_msg_write: EARLY TERMINATION\n");
1529323c0947SMatthew Dillon 		dmsg_msg_simulate_failure(state, DMSG_ERR_LOSTLINK);
1530323c0947SMatthew Dillon 		dmsg_state_cleanuptx(iocom, msg);
1531323c0947SMatthew Dillon 		dmsg_msg_free(msg);
1532323c0947SMatthew Dillon 		pthread_mutex_unlock(&iocom->mtx);
1533323c0947SMatthew Dillon 		return;
1534323c0947SMatthew Dillon 	}
1535323c0947SMatthew Dillon 
1536323c0947SMatthew Dillon 	/*
1537323c0947SMatthew Dillon 	 * Process state data into the message as needed, then update the
1538323c0947SMatthew Dillon 	 * state based on the message.
1539323c0947SMatthew Dillon 	 */
1540d30cab67SMatthew Dillon 	if ((state->flags & DMSG_STATE_ROOT) == 0) {
15410c3a8cd0SMatthew Dillon 		/*
15420c3a8cd0SMatthew Dillon 		 * Existing transaction (could be reply).  It is also
15430c3a8cd0SMatthew Dillon 		 * possible for this to be the first reply (CREATE is set),
15440c3a8cd0SMatthew Dillon 		 * in which case we populate state->txcmd.
15450c3a8cd0SMatthew Dillon 		 *
15460c3a8cd0SMatthew Dillon 		 * state->txcmd is adjusted to hold the final message cmd,
15470c3a8cd0SMatthew Dillon 		 * and we also be sure to set the CREATE bit here.  We did
15480c3a8cd0SMatthew Dillon 		 * not set it in dmsg_msg_alloc() because that would have
15490c3a8cd0SMatthew Dillon 		 * not been serialized (state could have gotten ripped out
15500c3a8cd0SMatthew Dillon 		 * from under the message prior to it being transmitted).
15510c3a8cd0SMatthew Dillon 		 */
15520c3a8cd0SMatthew Dillon 		if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
15530c3a8cd0SMatthew Dillon 		    DMSGF_CREATE) {
15540c3a8cd0SMatthew Dillon 			state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
15550d20ec8aSMatthew Dillon 			state->icmd = state->txcmd & DMSGF_BASECMDMASK;
15560c3a8cd0SMatthew Dillon 		}
15570c3a8cd0SMatthew Dillon 		msg->any.head.msgid = state->msgid;
15581b8eded1SMatthew Dillon 
15590d20ec8aSMatthew Dillon 		if (msg->any.head.cmd & DMSGF_CREATE) {
15600c3a8cd0SMatthew Dillon 			state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
15610c3a8cd0SMatthew Dillon 		}
15620d20ec8aSMatthew Dillon 	}
1563323c0947SMatthew Dillon 	dmsg_state_cleanuptx(iocom, msg);
15640c3a8cd0SMatthew Dillon 
15651b8eded1SMatthew Dillon #if 0
15661b8eded1SMatthew Dillon 	fprintf(stderr,
15671b8eded1SMatthew Dillon 		"MSGWRITE %016jx %08x\n",
15681b8eded1SMatthew Dillon 		msg->any.head.msgid, msg->any.head.cmd);
15691b8eded1SMatthew Dillon #endif
15701b8eded1SMatthew Dillon 
15710c3a8cd0SMatthew Dillon 	/*
15720c3a8cd0SMatthew Dillon 	 * Queue it for output, wake up the I/O pthread.  Note that the
15730c3a8cd0SMatthew Dillon 	 * I/O thread is responsible for generating the CRCs and encryption.
15740c3a8cd0SMatthew Dillon 	 */
15750d20ec8aSMatthew Dillon 	TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
15760c3a8cd0SMatthew Dillon 	dummy = 0;
15770c3a8cd0SMatthew Dillon 	write(iocom->wakeupfds[1], &dummy, 1);	/* XXX optimize me */
15780c3a8cd0SMatthew Dillon 	pthread_mutex_unlock(&iocom->mtx);
15790c3a8cd0SMatthew Dillon }
15800c3a8cd0SMatthew Dillon 
15810c3a8cd0SMatthew Dillon /*
1582323c0947SMatthew Dillon  * iocom->mtx must be held by caller.
1583323c0947SMatthew Dillon  */
1584323c0947SMatthew Dillon static
1585323c0947SMatthew Dillon void
1586323c0947SMatthew Dillon dmsg_msg_simulate_failure(dmsg_state_t *state, int error)
1587323c0947SMatthew Dillon {
1588323c0947SMatthew Dillon 	dmsg_iocom_t *iocom = state->iocom;
1589323c0947SMatthew Dillon 	dmsg_msg_t *msg;
1590323c0947SMatthew Dillon 
1591323c0947SMatthew Dillon 	msg = NULL;
1592323c0947SMatthew Dillon 
1593323c0947SMatthew Dillon 	if (state == &iocom->state0) {
1594323c0947SMatthew Dillon 		/*
1595323c0947SMatthew Dillon 		 * No active local or remote transactions remain.
1596323c0947SMatthew Dillon 		 * Generate a final LNK_ERROR and flag EOF.
1597323c0947SMatthew Dillon 		 */
1598323c0947SMatthew Dillon 		msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
1599323c0947SMatthew Dillon 					    DMSG_LNK_ERROR,
1600323c0947SMatthew Dillon 					    NULL, NULL);
1601323c0947SMatthew Dillon 		msg->any.head.error = error;
1602323c0947SMatthew Dillon 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
1603323c0947SMatthew Dillon 		fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
1604323c0947SMatthew Dillon 	} else if (state->flags & DMSG_STATE_OPPOSITE) {
1605323c0947SMatthew Dillon 		/*
1606323c0947SMatthew Dillon 		 * Active remote transactions are still present.
1607323c0947SMatthew Dillon 		 * Simulate the other end sending us a DELETE.
1608323c0947SMatthew Dillon 		 */
1609323c0947SMatthew Dillon 		if (state->rxcmd & DMSGF_DELETE) {
1610323c0947SMatthew Dillon 			fprintf(stderr,
1611323c0947SMatthew Dillon 				"iocom: ioq error(rd) %d sleeping "
1612323c0947SMatthew Dillon 				"state %p rxcmd %08x txcmd %08x "
1613323c0947SMatthew Dillon 				"func %p\n",
1614323c0947SMatthew Dillon 				error, state, state->rxcmd,
1615323c0947SMatthew Dillon 				state->txcmd, state->func);
1616323c0947SMatthew Dillon 			usleep(100000);	/* XXX */
1617323c0947SMatthew Dillon 			atomic_set_int(&iocom->flags,
1618323c0947SMatthew Dillon 				       DMSG_IOCOMF_RWORK);
1619323c0947SMatthew Dillon 		} else {
1620323c0947SMatthew Dillon 			fprintf(stderr, "SIMULATE ERROR1\n");
1621323c0947SMatthew Dillon 			msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
1622323c0947SMatthew Dillon 					     DMSG_LNK_ERROR,
1623323c0947SMatthew Dillon 					     NULL, NULL);
1624323c0947SMatthew Dillon 			/*state->txcmd |= DMSGF_DELETE;*/
1625323c0947SMatthew Dillon 			msg->state = state;
1626323c0947SMatthew Dillon 			msg->any.head.error = error;
1627323c0947SMatthew Dillon 			msg->any.head.msgid = state->msgid;
1628323c0947SMatthew Dillon 			msg->any.head.circuit = state->parent->msgid;
1629323c0947SMatthew Dillon 			msg->any.head.cmd |= DMSGF_ABORT |
1630323c0947SMatthew Dillon 					     DMSGF_DELETE;
1631323c0947SMatthew Dillon 			if ((state->parent->flags &
1632323c0947SMatthew Dillon 			     DMSG_STATE_OPPOSITE) == 0) {
1633323c0947SMatthew Dillon 				msg->any.head.cmd |= DMSGF_REVCIRC;
1634323c0947SMatthew Dillon 			}
1635323c0947SMatthew Dillon 		}
1636323c0947SMatthew Dillon 	} else {
1637323c0947SMatthew Dillon 		/*
1638323c0947SMatthew Dillon 		 * Active local transactions are still present.
1639323c0947SMatthew Dillon 		 * Simulate the other end sending us a DELETE.
1640323c0947SMatthew Dillon 		 */
1641323c0947SMatthew Dillon 		if (state->rxcmd & DMSGF_DELETE) {
1642323c0947SMatthew Dillon 			fprintf(stderr,
1643323c0947SMatthew Dillon 				"iocom: ioq error(wr) %d sleeping "
1644323c0947SMatthew Dillon 				"state %p rxcmd %08x txcmd %08x "
1645323c0947SMatthew Dillon 				"func %p\n",
1646323c0947SMatthew Dillon 				error, state, state->rxcmd,
1647323c0947SMatthew Dillon 				state->txcmd, state->func);
1648323c0947SMatthew Dillon 			usleep(100000);	/* XXX */
1649323c0947SMatthew Dillon 			atomic_set_int(&iocom->flags,
1650323c0947SMatthew Dillon 				       DMSG_IOCOMF_RWORK);
1651323c0947SMatthew Dillon 		} else {
1652323c0947SMatthew Dillon 			fprintf(stderr, "SIMULATE ERROR1\n");
1653323c0947SMatthew Dillon 			msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
1654323c0947SMatthew Dillon 					     DMSG_LNK_ERROR,
1655323c0947SMatthew Dillon 					     NULL, NULL);
1656323c0947SMatthew Dillon 			msg->state = state;
1657323c0947SMatthew Dillon 			msg->any.head.error = error;
1658323c0947SMatthew Dillon 			msg->any.head.msgid = state->msgid;
1659323c0947SMatthew Dillon 			msg->any.head.circuit = state->parent->msgid;
1660323c0947SMatthew Dillon 			msg->any.head.cmd |= DMSGF_ABORT |
1661323c0947SMatthew Dillon 					     DMSGF_DELETE |
1662323c0947SMatthew Dillon 					     DMSGF_REVTRANS |
1663323c0947SMatthew Dillon 					     DMSGF_REPLY;
1664323c0947SMatthew Dillon 			if ((state->parent->flags &
1665323c0947SMatthew Dillon 			     DMSG_STATE_OPPOSITE) == 0) {
1666323c0947SMatthew Dillon 				msg->any.head.cmd |= DMSGF_REVCIRC;
1667323c0947SMatthew Dillon 			}
1668323c0947SMatthew Dillon 			if ((state->rxcmd & DMSGF_CREATE) == 0)
1669323c0947SMatthew Dillon 				msg->any.head.cmd |= DMSGF_CREATE;
1670323c0947SMatthew Dillon 		}
1671323c0947SMatthew Dillon 	}
1672323c0947SMatthew Dillon 	if (msg) {
1673323c0947SMatthew Dillon 		TAILQ_INSERT_TAIL(&iocom->ioq_rx.msgq, msg, qentry);
1674323c0947SMatthew Dillon 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1675323c0947SMatthew Dillon 	}
1676323c0947SMatthew Dillon }
1677323c0947SMatthew Dillon 
1678323c0947SMatthew Dillon /*
16790c3a8cd0SMatthew Dillon  * This is a shortcut to formulate a reply to msg with a simple error code,
16800c3a8cd0SMatthew Dillon  * It can reply to and terminate a transaction, or it can reply to a one-way
16810c3a8cd0SMatthew Dillon  * messages.  A DMSG_LNK_ERROR command code is utilized to encode
16820c3a8cd0SMatthew Dillon  * the error code (which can be 0).  Not all transactions are terminated
16830c3a8cd0SMatthew Dillon  * with DMSG_LNK_ERROR status (the low level only cares about the
16840c3a8cd0SMatthew Dillon  * MSGF_DELETE flag), but most are.
16850c3a8cd0SMatthew Dillon  *
16860c3a8cd0SMatthew Dillon  * Replies to one-way messages are a bit of an oxymoron but the feature
16870c3a8cd0SMatthew Dillon  * is used by the debug (DBG) protocol.
16880c3a8cd0SMatthew Dillon  *
16890c3a8cd0SMatthew Dillon  * The reply contains no extended data.
16900c3a8cd0SMatthew Dillon  */
16910c3a8cd0SMatthew Dillon void
16920c3a8cd0SMatthew Dillon dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
16930c3a8cd0SMatthew Dillon {
16940c3a8cd0SMatthew Dillon 	dmsg_state_t *state = msg->state;
16950c3a8cd0SMatthew Dillon 	dmsg_msg_t *nmsg;
16960c3a8cd0SMatthew Dillon 	uint32_t cmd;
16970c3a8cd0SMatthew Dillon 
16980c3a8cd0SMatthew Dillon 	/*
16990c3a8cd0SMatthew Dillon 	 * Reply with a simple error code and terminate the transaction.
17000c3a8cd0SMatthew Dillon 	 */
17010c3a8cd0SMatthew Dillon 	cmd = DMSG_LNK_ERROR;
17020c3a8cd0SMatthew Dillon 
17030c3a8cd0SMatthew Dillon 	/*
17040c3a8cd0SMatthew Dillon 	 * Check if our direction has even been initiated yet, set CREATE.
17050c3a8cd0SMatthew Dillon 	 *
17060c3a8cd0SMatthew Dillon 	 * Check what direction this is (command or reply direction).  Note
17070c3a8cd0SMatthew Dillon 	 * that txcmd might not have been initiated yet.
17080c3a8cd0SMatthew Dillon 	 *
17090c3a8cd0SMatthew Dillon 	 * If our direction has already been closed we just return without
17100c3a8cd0SMatthew Dillon 	 * doing anything.
17110c3a8cd0SMatthew Dillon 	 */
1712d30cab67SMatthew Dillon 	if ((state->flags & DMSG_STATE_ROOT) == 0) {
17130c3a8cd0SMatthew Dillon 		if (state->txcmd & DMSGF_DELETE)
17140c3a8cd0SMatthew Dillon 			return;
17150c3a8cd0SMatthew Dillon 		if (state->txcmd & DMSGF_REPLY)
17160c3a8cd0SMatthew Dillon 			cmd |= DMSGF_REPLY;
17170c3a8cd0SMatthew Dillon 		cmd |= DMSGF_DELETE;
17180c3a8cd0SMatthew Dillon 	} else {
17190c3a8cd0SMatthew Dillon 		if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
17200c3a8cd0SMatthew Dillon 			cmd |= DMSGF_REPLY;
17210c3a8cd0SMatthew Dillon 	}
17220c3a8cd0SMatthew Dillon 
17230c3a8cd0SMatthew Dillon 	/*
17240c3a8cd0SMatthew Dillon 	 * Allocate the message and associate it with the existing state.
17250d20ec8aSMatthew Dillon 	 * We cannot pass DMSGF_CREATE to msg_alloc() because that may
17260c3a8cd0SMatthew Dillon 	 * allocate new state.  We have our state already.
17270c3a8cd0SMatthew Dillon 	 */
17281b8eded1SMatthew Dillon 	nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1729d30cab67SMatthew Dillon 	if ((state->flags & DMSG_STATE_ROOT) == 0) {
17300c3a8cd0SMatthew Dillon 		if ((state->txcmd & DMSGF_CREATE) == 0)
17310c3a8cd0SMatthew Dillon 			nmsg->any.head.cmd |= DMSGF_CREATE;
17320c3a8cd0SMatthew Dillon 	}
17330c3a8cd0SMatthew Dillon 	nmsg->any.head.error = error;
17341b8eded1SMatthew Dillon 
17350c3a8cd0SMatthew Dillon 	dmsg_msg_write(nmsg);
17360c3a8cd0SMatthew Dillon }
17370c3a8cd0SMatthew Dillon 
17380c3a8cd0SMatthew Dillon /*
17390c3a8cd0SMatthew Dillon  * Similar to dmsg_msg_reply() but leave the transaction open.  That is,
17400c3a8cd0SMatthew Dillon  * we are generating a streaming reply or an intermediate acknowledgement
17410c3a8cd0SMatthew Dillon  * of some sort as part of the higher level protocol, with more to come
17420c3a8cd0SMatthew Dillon  * later.
17430c3a8cd0SMatthew Dillon  */
17440c3a8cd0SMatthew Dillon void
17450c3a8cd0SMatthew Dillon dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
17460c3a8cd0SMatthew Dillon {
17470c3a8cd0SMatthew Dillon 	dmsg_state_t *state = msg->state;
17480c3a8cd0SMatthew Dillon 	dmsg_msg_t *nmsg;
17490c3a8cd0SMatthew Dillon 	uint32_t cmd;
17500c3a8cd0SMatthew Dillon 
17510c3a8cd0SMatthew Dillon 
17520c3a8cd0SMatthew Dillon 	/*
17530c3a8cd0SMatthew Dillon 	 * Reply with a simple error code and terminate the transaction.
17540c3a8cd0SMatthew Dillon 	 */
17550c3a8cd0SMatthew Dillon 	cmd = DMSG_LNK_ERROR;
17560c3a8cd0SMatthew Dillon 
17570c3a8cd0SMatthew Dillon 	/*
17580c3a8cd0SMatthew Dillon 	 * Check if our direction has even been initiated yet, set CREATE.
17590c3a8cd0SMatthew Dillon 	 *
17600c3a8cd0SMatthew Dillon 	 * Check what direction this is (command or reply direction).  Note
17610c3a8cd0SMatthew Dillon 	 * that txcmd might not have been initiated yet.
17620c3a8cd0SMatthew Dillon 	 *
17630c3a8cd0SMatthew Dillon 	 * If our direction has already been closed we just return without
17640c3a8cd0SMatthew Dillon 	 * doing anything.
17650c3a8cd0SMatthew Dillon 	 */
1766d30cab67SMatthew Dillon 	if ((state->flags & DMSG_STATE_ROOT) == 0) {
17670c3a8cd0SMatthew Dillon 		if (state->txcmd & DMSGF_DELETE)
17680c3a8cd0SMatthew Dillon 			return;
17690c3a8cd0SMatthew Dillon 		if (state->txcmd & DMSGF_REPLY)
17700c3a8cd0SMatthew Dillon 			cmd |= DMSGF_REPLY;
17710c3a8cd0SMatthew Dillon 		/* continuing transaction, do not set MSGF_DELETE */
17720c3a8cd0SMatthew Dillon 	} else {
17730c3a8cd0SMatthew Dillon 		if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
17740c3a8cd0SMatthew Dillon 			cmd |= DMSGF_REPLY;
17750c3a8cd0SMatthew Dillon 	}
17761b8eded1SMatthew Dillon 	nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1777d30cab67SMatthew Dillon 	if ((state->flags & DMSG_STATE_ROOT) == 0) {
17780c3a8cd0SMatthew Dillon 		if ((state->txcmd & DMSGF_CREATE) == 0)
17790c3a8cd0SMatthew Dillon 			nmsg->any.head.cmd |= DMSGF_CREATE;
17800c3a8cd0SMatthew Dillon 	}
17810c3a8cd0SMatthew Dillon 	nmsg->any.head.error = error;
17821b8eded1SMatthew Dillon 
17830c3a8cd0SMatthew Dillon 	dmsg_msg_write(nmsg);
17840c3a8cd0SMatthew Dillon }
17850c3a8cd0SMatthew Dillon 
17860c3a8cd0SMatthew Dillon /*
17870c3a8cd0SMatthew Dillon  * Terminate a transaction given a state structure by issuing a DELETE.
17881b8eded1SMatthew Dillon  * (the state structure must not be &iocom->state0)
17890c3a8cd0SMatthew Dillon  */
17900c3a8cd0SMatthew Dillon void
17910c3a8cd0SMatthew Dillon dmsg_state_reply(dmsg_state_t *state, uint32_t error)
17920c3a8cd0SMatthew Dillon {
17930c3a8cd0SMatthew Dillon 	dmsg_msg_t *nmsg;
17940c3a8cd0SMatthew Dillon 	uint32_t cmd = DMSG_LNK_ERROR | DMSGF_DELETE;
17950c3a8cd0SMatthew Dillon 
17960c3a8cd0SMatthew Dillon 	/*
17970c3a8cd0SMatthew Dillon 	 * Nothing to do if we already transmitted a delete
17980c3a8cd0SMatthew Dillon 	 */
17990c3a8cd0SMatthew Dillon 	if (state->txcmd & DMSGF_DELETE)
18000c3a8cd0SMatthew Dillon 		return;
18010c3a8cd0SMatthew Dillon 
18020c3a8cd0SMatthew Dillon 	/*
18030c3a8cd0SMatthew Dillon 	 * Set REPLY if the other end initiated the command.  Otherwise
18040c3a8cd0SMatthew Dillon 	 * we are the command direction.
18050c3a8cd0SMatthew Dillon 	 */
18060c3a8cd0SMatthew Dillon 	if (state->txcmd & DMSGF_REPLY)
18070c3a8cd0SMatthew Dillon 		cmd |= DMSGF_REPLY;
18080c3a8cd0SMatthew Dillon 
18091b8eded1SMatthew Dillon 	nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1810d30cab67SMatthew Dillon 	if ((state->flags & DMSG_STATE_ROOT) == 0) {
18110c3a8cd0SMatthew Dillon 		if ((state->txcmd & DMSGF_CREATE) == 0)
18120c3a8cd0SMatthew Dillon 			nmsg->any.head.cmd |= DMSGF_CREATE;
18130c3a8cd0SMatthew Dillon 	}
18140c3a8cd0SMatthew Dillon 	nmsg->any.head.error = error;
18150d20ec8aSMatthew Dillon 	dmsg_msg_write(nmsg);
18160d20ec8aSMatthew Dillon }
18170d20ec8aSMatthew Dillon 
18180d20ec8aSMatthew Dillon /*
18190d20ec8aSMatthew Dillon  * Terminate a transaction given a state structure by issuing a DELETE.
18201b8eded1SMatthew Dillon  * (the state structure must not be &iocom->state0)
18210d20ec8aSMatthew Dillon  */
18220d20ec8aSMatthew Dillon void
18230d20ec8aSMatthew Dillon dmsg_state_result(dmsg_state_t *state, uint32_t error)
18240d20ec8aSMatthew Dillon {
18250d20ec8aSMatthew Dillon 	dmsg_msg_t *nmsg;
18260d20ec8aSMatthew Dillon 	uint32_t cmd = DMSG_LNK_ERROR;
18270d20ec8aSMatthew Dillon 
18280d20ec8aSMatthew Dillon 	/*
18290d20ec8aSMatthew Dillon 	 * Nothing to do if we already transmitted a delete
18300d20ec8aSMatthew Dillon 	 */
18310d20ec8aSMatthew Dillon 	if (state->txcmd & DMSGF_DELETE)
18320d20ec8aSMatthew Dillon 		return;
18330d20ec8aSMatthew Dillon 
18340d20ec8aSMatthew Dillon 	/*
18350d20ec8aSMatthew Dillon 	 * Set REPLY if the other end initiated the command.  Otherwise
18360d20ec8aSMatthew Dillon 	 * we are the command direction.
18370d20ec8aSMatthew Dillon 	 */
18380d20ec8aSMatthew Dillon 	if (state->txcmd & DMSGF_REPLY)
18390d20ec8aSMatthew Dillon 		cmd |= DMSGF_REPLY;
18400d20ec8aSMatthew Dillon 
18411b8eded1SMatthew Dillon 	nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1842d30cab67SMatthew Dillon 	if ((state->flags & DMSG_STATE_ROOT) == 0) {
18430d20ec8aSMatthew Dillon 		if ((state->txcmd & DMSGF_CREATE) == 0)
18440d20ec8aSMatthew Dillon 			nmsg->any.head.cmd |= DMSGF_CREATE;
18450d20ec8aSMatthew Dillon 	}
18460d20ec8aSMatthew Dillon 	nmsg->any.head.error = error;
18470c3a8cd0SMatthew Dillon 	dmsg_msg_write(nmsg);
18480c3a8cd0SMatthew Dillon }
18490c3a8cd0SMatthew Dillon 
18500c3a8cd0SMatthew Dillon /************************************************************************
18510c3a8cd0SMatthew Dillon  *			TRANSACTION STATE HANDLING			*
18520c3a8cd0SMatthew Dillon  ************************************************************************
18530c3a8cd0SMatthew Dillon  *
18540c3a8cd0SMatthew Dillon  */
18550c3a8cd0SMatthew Dillon 
18560c3a8cd0SMatthew Dillon /*
1857d30cab67SMatthew Dillon  * Process state tracking for a message after reception, prior to execution.
1858d30cab67SMatthew Dillon  * Possibly route the message (consuming it).
18590c3a8cd0SMatthew Dillon  *
18600c3a8cd0SMatthew Dillon  * Called with msglk held and the msg dequeued.
18610c3a8cd0SMatthew Dillon  *
18620c3a8cd0SMatthew Dillon  * All messages are called with dummy state and return actual state.
18630c3a8cd0SMatthew Dillon  * (One-off messages often just return the same dummy state).
18640c3a8cd0SMatthew Dillon  *
18650c3a8cd0SMatthew Dillon  * May request that caller discard the message by setting *discardp to 1.
18660c3a8cd0SMatthew Dillon  * The returned state is not used in this case and is allowed to be NULL.
18670c3a8cd0SMatthew Dillon  *
18680c3a8cd0SMatthew Dillon  * --
18690c3a8cd0SMatthew Dillon  *
18700c3a8cd0SMatthew Dillon  * These routines handle persistent and command/reply message state via the
18710c3a8cd0SMatthew Dillon  * CREATE and DELETE flags.  The first message in a command or reply sequence
18720c3a8cd0SMatthew Dillon  * sets CREATE, the last message in a command or reply sequence sets DELETE.
18730c3a8cd0SMatthew Dillon  *
18740c3a8cd0SMatthew Dillon  * There can be any number of intermediate messages belonging to the same
18750c3a8cd0SMatthew Dillon  * sequence sent inbetween the CREATE message and the DELETE message,
18760c3a8cd0SMatthew Dillon  * which set neither flag.  This represents a streaming command or reply.
18770c3a8cd0SMatthew Dillon  *
18780c3a8cd0SMatthew Dillon  * Any command message received with CREATE set expects a reply sequence to
18790c3a8cd0SMatthew Dillon  * be returned.  Reply sequences work the same as command sequences except the
18800c3a8cd0SMatthew Dillon  * REPLY bit is also sent.  Both the command side and reply side can
18810c3a8cd0SMatthew Dillon  * degenerate into a single message with both CREATE and DELETE set.  Note
18820c3a8cd0SMatthew Dillon  * that one side can be streaming and the other side not, or neither, or both.
18830c3a8cd0SMatthew Dillon  *
18840c3a8cd0SMatthew Dillon  * The msgid is unique for the initiator.  That is, two sides sending a new
18850c3a8cd0SMatthew Dillon  * message can use the same msgid without colliding.
18860c3a8cd0SMatthew Dillon  *
18870c3a8cd0SMatthew Dillon  * --
18880c3a8cd0SMatthew Dillon  *
18890c3a8cd0SMatthew Dillon  * ABORT sequences work by setting the ABORT flag along with normal message
18900c3a8cd0SMatthew Dillon  * state.  However, ABORTs can also be sent on half-closed messages, that is
18910c3a8cd0SMatthew Dillon  * even if the command or reply side has already sent a DELETE, as long as
18920c3a8cd0SMatthew Dillon  * the message has not been fully closed it can still send an ABORT+DELETE
18930c3a8cd0SMatthew Dillon  * to terminate the half-closed message state.
18940c3a8cd0SMatthew Dillon  *
18950c3a8cd0SMatthew Dillon  * Since ABORT+DELETEs can race we silently discard ABORT's for message
18960c3a8cd0SMatthew Dillon  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
18970c3a8cd0SMatthew Dillon  * also race, and in this situation the other side might have already
18980c3a8cd0SMatthew Dillon  * initiated a new unrelated command with the same message id.  Since
18990c3a8cd0SMatthew Dillon  * the abort has not set the CREATE flag the situation can be detected
19000c3a8cd0SMatthew Dillon  * and the message will also be discarded.
19010c3a8cd0SMatthew Dillon  *
19020c3a8cd0SMatthew Dillon  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
19030c3a8cd0SMatthew Dillon  * The ABORT request is essentially integrated into the command instead
19040c3a8cd0SMatthew Dillon  * of being sent later on.  In this situation the command implementation
19050c3a8cd0SMatthew Dillon  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
19060c3a8cd0SMatthew Dillon  * special-case non-blocking operation for the command.
19070c3a8cd0SMatthew Dillon  *
19080c3a8cd0SMatthew Dillon  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
19090c3a8cd0SMatthew Dillon  *	  to be mid-stream aborts for command/reply sequences.  ABORTs on
19100c3a8cd0SMatthew Dillon  *	  one-way messages are not supported.
19110c3a8cd0SMatthew Dillon  *
19120c3a8cd0SMatthew Dillon  * NOTE!  If a command sequence does not support aborts the ABORT flag is
19130c3a8cd0SMatthew Dillon  *	  simply ignored.
19140c3a8cd0SMatthew Dillon  *
19150c3a8cd0SMatthew Dillon  * --
19160c3a8cd0SMatthew Dillon  *
1917d30cab67SMatthew Dillon  * One-off messages (no reply expected) are sent without an established
1918d30cab67SMatthew Dillon  * transaction.  CREATE and DELETE are left clear and the msgid is usually 0.
1919d30cab67SMatthew Dillon  * For one-off messages sent over circuits msgid generally MUST be 0.
1920d30cab67SMatthew Dillon  *
1921d30cab67SMatthew Dillon  * One-off messages cannot be aborted and typically aren't processed
1922d30cab67SMatthew Dillon  * by these routines.  Order is still guaranteed for messages sent over
1923d30cab67SMatthew Dillon  * the same circuit.  The REPLY bit can be used to distinguish whether
1924d30cab67SMatthew Dillon  * a one-off message is a command or reply.  For example, one-off replies
19250c3a8cd0SMatthew Dillon  * will typically just contain status updates.
19260c3a8cd0SMatthew Dillon  */
19270c3a8cd0SMatthew Dillon static int
19280c3a8cd0SMatthew Dillon dmsg_state_msgrx(dmsg_msg_t *msg)
19290c3a8cd0SMatthew Dillon {
19301b8eded1SMatthew Dillon 	dmsg_iocom_t *iocom = msg->state->iocom;
19310c3a8cd0SMatthew Dillon 	dmsg_state_t *state;
19321b8eded1SMatthew Dillon 	dmsg_state_t *pstate;
19330d20ec8aSMatthew Dillon 	dmsg_state_t sdummy;
19340c3a8cd0SMatthew Dillon 	int error;
19350c3a8cd0SMatthew Dillon 
19360d20ec8aSMatthew Dillon 	pthread_mutex_lock(&iocom->mtx);
19370d20ec8aSMatthew Dillon 
19380c3a8cd0SMatthew Dillon 	/*
1939d30cab67SMatthew Dillon 	 * Lookup the circuit (pstate).  The circuit will be an open
1940d30cab67SMatthew Dillon 	 * transaction.  The REVCIRC bit in the message tells us which side
1941d30cab67SMatthew Dillon 	 * initiated it.
19421b8eded1SMatthew Dillon 	 */
19431b8eded1SMatthew Dillon 	if (msg->any.head.circuit) {
19441b8eded1SMatthew Dillon 		sdummy.msgid = msg->any.head.circuit;
19451b8eded1SMatthew Dillon 
19461b8eded1SMatthew Dillon 		if (msg->any.head.cmd & DMSGF_REVCIRC) {
19471b8eded1SMatthew Dillon 			pstate = RB_FIND(dmsg_state_tree,
19481b8eded1SMatthew Dillon 					 &iocom->statewr_tree,
19491b8eded1SMatthew Dillon 					 &sdummy);
19501b8eded1SMatthew Dillon 		} else {
19511b8eded1SMatthew Dillon 			pstate = RB_FIND(dmsg_state_tree,
19521b8eded1SMatthew Dillon 					 &iocom->staterd_tree,
19531b8eded1SMatthew Dillon 					 &sdummy);
19541b8eded1SMatthew Dillon 		}
19551b8eded1SMatthew Dillon 		if (pstate == NULL) {
19561b8eded1SMatthew Dillon 			fprintf(stderr,
19571b8eded1SMatthew Dillon 				"missing parent in stacked trans %s\n",
19581b8eded1SMatthew Dillon 				dmsg_msg_str(msg));
19591b8eded1SMatthew Dillon 			error = DMSG_IOQ_ERROR_TRANS;
19601b8eded1SMatthew Dillon 			pthread_mutex_unlock(&iocom->mtx);
19611b8eded1SMatthew Dillon 			assert(0);
19621b8eded1SMatthew Dillon 		}
19631b8eded1SMatthew Dillon 	} else {
19641b8eded1SMatthew Dillon 		pstate = &iocom->state0;
19651b8eded1SMatthew Dillon 	}
19661b8eded1SMatthew Dillon 
19671b8eded1SMatthew Dillon 	/*
1968d30cab67SMatthew Dillon 	 * Lookup the msgid.
1969d30cab67SMatthew Dillon 	 *
1970d30cab67SMatthew Dillon 	 * If received msg is a command state is on staterd_tree.
1971d30cab67SMatthew Dillon 	 * If received msg is a reply state is on statewr_tree.
1972d30cab67SMatthew Dillon 	 * Otherwise there is no state (retain &iocom->state0)
1973d30cab67SMatthew Dillon 	 */
1974d30cab67SMatthew Dillon 	sdummy.msgid = msg->any.head.msgid;
1975d30cab67SMatthew Dillon 	if (msg->any.head.cmd & DMSGF_REVTRANS)
1976d30cab67SMatthew Dillon 		state = RB_FIND(dmsg_state_tree, &iocom->statewr_tree, &sdummy);
1977d30cab67SMatthew Dillon 	else
1978d30cab67SMatthew Dillon 		state = RB_FIND(dmsg_state_tree, &iocom->staterd_tree, &sdummy);
1979d30cab67SMatthew Dillon 
1980d30cab67SMatthew Dillon 	if (state) {
1981d30cab67SMatthew Dillon 		/*
1982d30cab67SMatthew Dillon 		 * Message over an existing transaction (CREATE should not
1983d30cab67SMatthew Dillon 		 * be set).
1984d30cab67SMatthew Dillon 		 */
1985d30cab67SMatthew Dillon 		msg->state = state;
1986d30cab67SMatthew Dillon 		assert(pstate == state->parent);
1987d30cab67SMatthew Dillon 	} else {
1988d30cab67SMatthew Dillon 		/*
1989d30cab67SMatthew Dillon 		 * Either a new transaction (if CREATE set) or a one-off.
1990d30cab67SMatthew Dillon 		 */
1991d30cab67SMatthew Dillon 		state = pstate;
1992d30cab67SMatthew Dillon 	}
1993d30cab67SMatthew Dillon 
1994d30cab67SMatthew Dillon 	pthread_mutex_unlock(&iocom->mtx);
1995d30cab67SMatthew Dillon 
1996d30cab67SMatthew Dillon 	/*
1997d30cab67SMatthew Dillon 	 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1998d30cab67SMatthew Dillon 	 * inside the case statements.
1999d30cab67SMatthew Dillon 	 *
2000d30cab67SMatthew Dillon 	 * Construct new state as necessary.
2001d30cab67SMatthew Dillon 	 */
2002d30cab67SMatthew Dillon 	switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
2003d30cab67SMatthew Dillon 				    DMSGF_REPLY)) {
2004d30cab67SMatthew Dillon 	case DMSGF_CREATE:
2005d30cab67SMatthew Dillon 	case DMSGF_CREATE | DMSGF_DELETE:
2006d30cab67SMatthew Dillon 		/*
2007d30cab67SMatthew Dillon 		 * Create new sub-transaction under pstate.
2008d30cab67SMatthew Dillon 		 * (any DELETE is handled in post-processing of msg).
2009d30cab67SMatthew Dillon 		 *
2010d30cab67SMatthew Dillon 		 * (During routing the msgid was made unique for this
2011d30cab67SMatthew Dillon 		 * direction over the comlink, so our RB trees can be
2012d30cab67SMatthew Dillon 		 * iocom-based instead of state-based).
2013d30cab67SMatthew Dillon 		 */
2014d30cab67SMatthew Dillon 		if (state != pstate) {
2015d30cab67SMatthew Dillon 			fprintf(stderr,
2016d30cab67SMatthew Dillon 				"duplicate transaction %s\n",
2017d30cab67SMatthew Dillon 				dmsg_msg_str(msg));
2018d30cab67SMatthew Dillon 			error = DMSG_IOQ_ERROR_TRANS;
2019d30cab67SMatthew Dillon 			assert(0);
2020d30cab67SMatthew Dillon 			break;
2021d30cab67SMatthew Dillon 		}
2022d30cab67SMatthew Dillon 
2023d30cab67SMatthew Dillon 		/*
2024d30cab67SMatthew Dillon 		 * Allocate the new state.
20251b8eded1SMatthew Dillon 		 */
20260c3a8cd0SMatthew Dillon 		state = malloc(sizeof(*state));
2027323c0947SMatthew Dillon 		atomic_add_int(&dmsg_state_count, 1);
20280c3a8cd0SMatthew Dillon 		bzero(state, sizeof(*state));
20291b8eded1SMatthew Dillon 		TAILQ_INIT(&state->subq);
2030323c0947SMatthew Dillon 		dmsg_state_hold(pstate);
2031323c0947SMatthew Dillon 		state->refs = 1;
20321b8eded1SMatthew Dillon 		state->parent = pstate;
20330c3a8cd0SMatthew Dillon 		state->iocom = iocom;
20341b8eded1SMatthew Dillon 		state->flags = DMSG_STATE_DYNAMIC |
2035d30cab67SMatthew Dillon 			       DMSG_STATE_OPPOSITE;
20361b8eded1SMatthew Dillon 		state->msgid = msg->any.head.msgid;
20370c3a8cd0SMatthew Dillon 		state->txcmd = DMSGF_REPLY;
20380c3a8cd0SMatthew Dillon 		state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
20390d20ec8aSMatthew Dillon 		state->icmd = state->rxcmd & DMSGF_BASECMDMASK;
20400c3a8cd0SMatthew Dillon 		msg->state = state;
20410c3a8cd0SMatthew Dillon 		pthread_mutex_lock(&iocom->mtx);
20421b8eded1SMatthew Dillon 		RB_INSERT(dmsg_state_tree, &iocom->staterd_tree, state);
20431b8eded1SMatthew Dillon 		TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
20441b8eded1SMatthew Dillon 		state->flags |= DMSG_STATE_INSERTED;
2045d30cab67SMatthew Dillon 
2046d30cab67SMatthew Dillon 		/*
2047d30cab67SMatthew Dillon 		 * If the parent is a relay set up the state handler to
2048d30cab67SMatthew Dillon 		 * automatically route the message.  Local processing will
2049d30cab67SMatthew Dillon 		 * not occur if set.
2050d30cab67SMatthew Dillon 		 *
2051d30cab67SMatthew Dillon 		 * (state relays are seeded by SPAN processing)
2052d30cab67SMatthew Dillon 		 */
2053d30cab67SMatthew Dillon 		if (pstate->relay)
2054d30cab67SMatthew Dillon 			state->func = dmsg_state_relay;
20550c3a8cd0SMatthew Dillon 		pthread_mutex_unlock(&iocom->mtx);
20560c3a8cd0SMatthew Dillon 		error = 0;
2057d30cab67SMatthew Dillon 
20580c3a8cd0SMatthew Dillon 		if (DMsgDebugOpt) {
20591b8eded1SMatthew Dillon 			fprintf(stderr,
20601b8eded1SMatthew Dillon 				"create state %p id=%08x on iocom staterd %p\n",
20610c3a8cd0SMatthew Dillon 				state, (uint32_t)state->msgid, iocom);
20620c3a8cd0SMatthew Dillon 		}
20630c3a8cd0SMatthew Dillon 		break;
20640c3a8cd0SMatthew Dillon 	case DMSGF_DELETE:
20650c3a8cd0SMatthew Dillon 		/*
20660c3a8cd0SMatthew Dillon 		 * Persistent state is expected but might not exist if an
20670c3a8cd0SMatthew Dillon 		 * ABORT+DELETE races the close.
2068d30cab67SMatthew Dillon 		 *
2069d30cab67SMatthew Dillon 		 * (any DELETE is handled in post-processing of msg).
20700c3a8cd0SMatthew Dillon 		 */
2071d30cab67SMatthew Dillon 		if (state == pstate) {
20720c3a8cd0SMatthew Dillon 			if (msg->any.head.cmd & DMSGF_ABORT) {
20730c3a8cd0SMatthew Dillon 				error = DMSG_IOQ_ERROR_EALREADY;
20740c3a8cd0SMatthew Dillon 			} else {
20750c3a8cd0SMatthew Dillon 				fprintf(stderr, "missing-state %s\n",
20760c3a8cd0SMatthew Dillon 					dmsg_msg_str(msg));
20770c3a8cd0SMatthew Dillon 				error = DMSG_IOQ_ERROR_TRANS;
20780c3a8cd0SMatthew Dillon 				assert(0);
20790c3a8cd0SMatthew Dillon 			}
20800c3a8cd0SMatthew Dillon 			break;
20810c3a8cd0SMatthew Dillon 		}
20820c3a8cd0SMatthew Dillon 
20830c3a8cd0SMatthew Dillon 		/*
20840c3a8cd0SMatthew Dillon 		 * Handle another ABORT+DELETE case if the msgid has already
20850c3a8cd0SMatthew Dillon 		 * been reused.
20860c3a8cd0SMatthew Dillon 		 */
20870c3a8cd0SMatthew Dillon 		if ((state->rxcmd & DMSGF_CREATE) == 0) {
20880c3a8cd0SMatthew Dillon 			if (msg->any.head.cmd & DMSGF_ABORT) {
20890c3a8cd0SMatthew Dillon 				error = DMSG_IOQ_ERROR_EALREADY;
20900c3a8cd0SMatthew Dillon 			} else {
20910c3a8cd0SMatthew Dillon 				fprintf(stderr, "reused-state %s\n",
20920c3a8cd0SMatthew Dillon 					dmsg_msg_str(msg));
20930c3a8cd0SMatthew Dillon 				error = DMSG_IOQ_ERROR_TRANS;
20940c3a8cd0SMatthew Dillon 				assert(0);
20950c3a8cd0SMatthew Dillon 			}
20960c3a8cd0SMatthew Dillon 			break;
20970c3a8cd0SMatthew Dillon 		}
20980c3a8cd0SMatthew Dillon 		error = 0;
20990c3a8cd0SMatthew Dillon 		break;
21000c3a8cd0SMatthew Dillon 	default:
21010c3a8cd0SMatthew Dillon 		/*
21020c3a8cd0SMatthew Dillon 		 * Check for mid-stream ABORT command received, otherwise
21030c3a8cd0SMatthew Dillon 		 * allow.
21040c3a8cd0SMatthew Dillon 		 */
21050c3a8cd0SMatthew Dillon 		if (msg->any.head.cmd & DMSGF_ABORT) {
2106d30cab67SMatthew Dillon 			if ((state == pstate) ||
21070c3a8cd0SMatthew Dillon 			    (state->rxcmd & DMSGF_CREATE) == 0) {
21080c3a8cd0SMatthew Dillon 				error = DMSG_IOQ_ERROR_EALREADY;
21090c3a8cd0SMatthew Dillon 				break;
21100c3a8cd0SMatthew Dillon 			}
21110c3a8cd0SMatthew Dillon 		}
21120c3a8cd0SMatthew Dillon 		error = 0;
21130c3a8cd0SMatthew Dillon 		break;
21140c3a8cd0SMatthew Dillon 	case DMSGF_REPLY | DMSGF_CREATE:
21150c3a8cd0SMatthew Dillon 	case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
21160c3a8cd0SMatthew Dillon 		/*
21170c3a8cd0SMatthew Dillon 		 * When receiving a reply with CREATE set the original
21180c3a8cd0SMatthew Dillon 		 * persistent state message should already exist.
21190c3a8cd0SMatthew Dillon 		 */
2120d30cab67SMatthew Dillon 		if (state == pstate) {
21210c3a8cd0SMatthew Dillon 			fprintf(stderr, "no-state(r) %s\n",
21220c3a8cd0SMatthew Dillon 				dmsg_msg_str(msg));
21230c3a8cd0SMatthew Dillon 			error = DMSG_IOQ_ERROR_TRANS;
21240c3a8cd0SMatthew Dillon 			assert(0);
21250c3a8cd0SMatthew Dillon 			break;
21260c3a8cd0SMatthew Dillon 		}
2127d30cab67SMatthew Dillon 		assert(((state->rxcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0);
21280c3a8cd0SMatthew Dillon 		state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
21290c3a8cd0SMatthew Dillon 		error = 0;
21300c3a8cd0SMatthew Dillon 		break;
21310c3a8cd0SMatthew Dillon 	case DMSGF_REPLY | DMSGF_DELETE:
21320c3a8cd0SMatthew Dillon 		/*
21330c3a8cd0SMatthew Dillon 		 * Received REPLY+ABORT+DELETE in case where msgid has
21340c3a8cd0SMatthew Dillon 		 * already been fully closed, ignore the message.
21350c3a8cd0SMatthew Dillon 		 */
2136d30cab67SMatthew Dillon 		if (state == pstate) {
21370c3a8cd0SMatthew Dillon 			if (msg->any.head.cmd & DMSGF_ABORT) {
21380c3a8cd0SMatthew Dillon 				error = DMSG_IOQ_ERROR_EALREADY;
21390c3a8cd0SMatthew Dillon 			} else {
21400c3a8cd0SMatthew Dillon 				fprintf(stderr, "no-state(r,d) %s\n",
21410c3a8cd0SMatthew Dillon 					dmsg_msg_str(msg));
21420c3a8cd0SMatthew Dillon 				error = DMSG_IOQ_ERROR_TRANS;
21430c3a8cd0SMatthew Dillon 				assert(0);
21440c3a8cd0SMatthew Dillon 			}
21450c3a8cd0SMatthew Dillon 			break;
21460c3a8cd0SMatthew Dillon 		}
21470c3a8cd0SMatthew Dillon 
21480c3a8cd0SMatthew Dillon 		/*
21490c3a8cd0SMatthew Dillon 		 * Received REPLY+ABORT+DELETE in case where msgid has
21500c3a8cd0SMatthew Dillon 		 * already been reused for an unrelated message,
21510c3a8cd0SMatthew Dillon 		 * ignore the message.
21520c3a8cd0SMatthew Dillon 		 */
21530c3a8cd0SMatthew Dillon 		if ((state->rxcmd & DMSGF_CREATE) == 0) {
21540c3a8cd0SMatthew Dillon 			if (msg->any.head.cmd & DMSGF_ABORT) {
21550c3a8cd0SMatthew Dillon 				error = DMSG_IOQ_ERROR_EALREADY;
21560c3a8cd0SMatthew Dillon 			} else {
21570c3a8cd0SMatthew Dillon 				fprintf(stderr, "reused-state(r,d) %s\n",
21580c3a8cd0SMatthew Dillon 					dmsg_msg_str(msg));
21590c3a8cd0SMatthew Dillon 				error = DMSG_IOQ_ERROR_TRANS;
21600c3a8cd0SMatthew Dillon 				assert(0);
21610c3a8cd0SMatthew Dillon 			}
21620c3a8cd0SMatthew Dillon 			break;
21630c3a8cd0SMatthew Dillon 		}
21640c3a8cd0SMatthew Dillon 		error = 0;
21650c3a8cd0SMatthew Dillon 		break;
21660c3a8cd0SMatthew Dillon 	case DMSGF_REPLY:
21670c3a8cd0SMatthew Dillon 		/*
21680c3a8cd0SMatthew Dillon 		 * Check for mid-stream ABORT reply received to sent command.
21690c3a8cd0SMatthew Dillon 		 */
21700c3a8cd0SMatthew Dillon 		if (msg->any.head.cmd & DMSGF_ABORT) {
2171d30cab67SMatthew Dillon 			if (state == pstate ||
21720c3a8cd0SMatthew Dillon 			    (state->rxcmd & DMSGF_CREATE) == 0) {
21730c3a8cd0SMatthew Dillon 				error = DMSG_IOQ_ERROR_EALREADY;
21740c3a8cd0SMatthew Dillon 				break;
21750c3a8cd0SMatthew Dillon 			}
21760c3a8cd0SMatthew Dillon 		}
21770c3a8cd0SMatthew Dillon 		error = 0;
21780c3a8cd0SMatthew Dillon 		break;
21790c3a8cd0SMatthew Dillon 	}
21808e226bc8SMatthew Dillon 
21818e226bc8SMatthew Dillon 	/*
21828e226bc8SMatthew Dillon 	 * Calculate the easy-switch() transactional command.  Represents
21838e226bc8SMatthew Dillon 	 * the outer-transaction command for any transaction-create or
21848e226bc8SMatthew Dillon 	 * transaction-delete, and the inner message command for any
21858e226bc8SMatthew Dillon 	 * non-transaction or inside-transaction command.  tcmd will be
21868e226bc8SMatthew Dillon 	 * set to 0 for any messaging error condition.
21878e226bc8SMatthew Dillon 	 *
21888e226bc8SMatthew Dillon 	 * The two can be told apart because outer-transaction commands
21898e226bc8SMatthew Dillon 	 * always have a DMSGF_CREATE and/or DMSGF_DELETE flag.
21908e226bc8SMatthew Dillon 	 */
21918e226bc8SMatthew Dillon 	if (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE)) {
2192d30cab67SMatthew Dillon 		if ((state->flags & DMSG_STATE_ROOT) == 0) {
21938e226bc8SMatthew Dillon 			msg->tcmd = (msg->state->icmd & DMSGF_BASECMDMASK) |
21948e226bc8SMatthew Dillon 				    (msg->any.head.cmd & (DMSGF_CREATE |
21958e226bc8SMatthew Dillon 							  DMSGF_DELETE |
21968e226bc8SMatthew Dillon 							  DMSGF_REPLY));
21978e226bc8SMatthew Dillon 		} else {
21988e226bc8SMatthew Dillon 			msg->tcmd = 0;
21998e226bc8SMatthew Dillon 		}
22008e226bc8SMatthew Dillon 	} else {
22018e226bc8SMatthew Dillon 		msg->tcmd = msg->any.head.cmd & DMSGF_CMDSWMASK;
22028e226bc8SMatthew Dillon 	}
22030c3a8cd0SMatthew Dillon 	return (error);
22040c3a8cd0SMatthew Dillon }
22050c3a8cd0SMatthew Dillon 
22061b8eded1SMatthew Dillon /*
2207d30cab67SMatthew Dillon  * Route the message and handle pair-state processing.
22081b8eded1SMatthew Dillon  */
2209d30cab67SMatthew Dillon void
2210d30cab67SMatthew Dillon dmsg_state_relay(dmsg_msg_t *lmsg)
22111b8eded1SMatthew Dillon {
2212d30cab67SMatthew Dillon 	dmsg_state_t *lpstate;
2213d30cab67SMatthew Dillon 	dmsg_state_t *rpstate;
2214d30cab67SMatthew Dillon 	dmsg_state_t *lstate;
2215d30cab67SMatthew Dillon 	dmsg_state_t *rstate;
2216d30cab67SMatthew Dillon 	dmsg_msg_t *rmsg;
22171b8eded1SMatthew Dillon 
2218d30cab67SMatthew Dillon 	if ((lmsg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
2219d30cab67SMatthew Dillon 	    DMSGF_CREATE) {
22201b8eded1SMatthew Dillon 		/*
2221d30cab67SMatthew Dillon 		 * New sub-transaction, establish new state and relay.
22221b8eded1SMatthew Dillon 		 */
2223d30cab67SMatthew Dillon 		lstate = lmsg->state;
2224d30cab67SMatthew Dillon 		lpstate = lstate->parent;
2225d30cab67SMatthew Dillon 		rpstate = lpstate->relay;
2226d30cab67SMatthew Dillon 		assert(lstate->relay == NULL);
2227d30cab67SMatthew Dillon 		assert(rpstate != NULL);
22281b8eded1SMatthew Dillon 
2229d30cab67SMatthew Dillon 		rmsg = dmsg_msg_alloc(rpstate,
2230d30cab67SMatthew Dillon 				      lmsg->aux_size,
2231d30cab67SMatthew Dillon 				      lmsg->any.head.cmd,
2232d30cab67SMatthew Dillon 				      dmsg_state_relay, NULL);
2233d30cab67SMatthew Dillon 		rstate = rmsg->state;
2234d30cab67SMatthew Dillon 		rstate->relay = lstate;
2235d30cab67SMatthew Dillon 		lstate->relay = rstate;
2236323c0947SMatthew Dillon 		dmsg_state_hold(lstate);
2237323c0947SMatthew Dillon 		dmsg_state_hold(rstate);
22381b8eded1SMatthew Dillon 	} else {
22391b8eded1SMatthew Dillon 		/*
2240d30cab67SMatthew Dillon 		 * State & relay already established
22411b8eded1SMatthew Dillon 		 */
2242d30cab67SMatthew Dillon 		lstate = lmsg->state;
2243d30cab67SMatthew Dillon 		rstate = lstate->relay;
2244d30cab67SMatthew Dillon 		assert(rstate != NULL);
2245d30cab67SMatthew Dillon 
2246d30cab67SMatthew Dillon 		rmsg = dmsg_msg_alloc(rstate,
2247d30cab67SMatthew Dillon 				      lmsg->aux_size,
2248d30cab67SMatthew Dillon 				      lmsg->any.head.cmd,
2249d30cab67SMatthew Dillon 				      dmsg_state_relay, NULL);
22501b8eded1SMatthew Dillon 	}
2251d30cab67SMatthew Dillon 	if (lmsg->hdr_size > sizeof(lmsg->any.head)) {
2252d30cab67SMatthew Dillon 		bcopy(&lmsg->any.head + 1, &rmsg->any.head + 1,
2253d30cab67SMatthew Dillon 		      lmsg->hdr_size - sizeof(lmsg->any.head));
2254d30cab67SMatthew Dillon 	}
2255d30cab67SMatthew Dillon 	rmsg->any.head.error = lmsg->any.head.error;
2256d30cab67SMatthew Dillon 	rmsg->any.head.reserved02 = lmsg->any.head.reserved02;
2257d30cab67SMatthew Dillon 	rmsg->any.head.reserved18 = lmsg->any.head.reserved18;
2258d30cab67SMatthew Dillon 	rmsg->aux_data = lmsg->aux_data;
2259d30cab67SMatthew Dillon 	lmsg->aux_data = NULL;
2260d30cab67SMatthew Dillon 	/*
2261d30cab67SMatthew Dillon 	fprintf(stderr, "RELAY %08x\n", rmsg->any.head.cmd);
2262d30cab67SMatthew Dillon 	*/
2263d30cab67SMatthew Dillon 	dmsg_msg_write(rmsg);
22641b8eded1SMatthew Dillon }
22651b8eded1SMatthew Dillon 
2266d30cab67SMatthew Dillon /*
2267d30cab67SMatthew Dillon  * Cleanup and retire msg after processing
2268d30cab67SMatthew Dillon  */
22690c3a8cd0SMatthew Dillon void
22700c3a8cd0SMatthew Dillon dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
22710c3a8cd0SMatthew Dillon {
22720c3a8cd0SMatthew Dillon 	dmsg_state_t *state;
22731b8eded1SMatthew Dillon 	dmsg_state_t *pstate;
22740c3a8cd0SMatthew Dillon 
22751b8eded1SMatthew Dillon 	assert(msg->state->iocom == iocom);
22761b8eded1SMatthew Dillon 	state = msg->state;
2277d30cab67SMatthew Dillon 	if (state->flags & DMSG_STATE_ROOT) {
22780c3a8cd0SMatthew Dillon 		/*
22790c3a8cd0SMatthew Dillon 		 * Free a non-transactional message, there is no state
22800c3a8cd0SMatthew Dillon 		 * to worry about.
22810c3a8cd0SMatthew Dillon 		 */
22820c3a8cd0SMatthew Dillon 		dmsg_msg_free(msg);
22830c3a8cd0SMatthew Dillon 	} else if (msg->any.head.cmd & DMSGF_DELETE) {
22840c3a8cd0SMatthew Dillon 		/*
22850c3a8cd0SMatthew Dillon 		 * Message terminating transaction, destroy the related
22860c3a8cd0SMatthew Dillon 		 * state, the original message, and this message (if it
22870c3a8cd0SMatthew Dillon 		 * isn't the original message due to a CREATE|DELETE).
2288323c0947SMatthew Dillon 		 *
2289323c0947SMatthew Dillon 		 * It's possible for governing state to terminate while
2290323c0947SMatthew Dillon 		 * sub-transactions still exist.  This is allowed but
2291323c0947SMatthew Dillon 		 * will cause sub-transactions to recursively fail.
2292323c0947SMatthew Dillon 		 * Further reception of sub-transaction messages will be
2293323c0947SMatthew Dillon 		 * impossible because the circuit will no longer exist.
2294323c0947SMatthew Dillon 		 * (XXX need code to make sure that happens properly).
22950c3a8cd0SMatthew Dillon 		 */
22960c3a8cd0SMatthew Dillon 		pthread_mutex_lock(&iocom->mtx);
22970c3a8cd0SMatthew Dillon 		state->rxcmd |= DMSGF_DELETE;
2298323c0947SMatthew Dillon 
22990c3a8cd0SMatthew Dillon 		if (state->txcmd & DMSGF_DELETE) {
23000c3a8cd0SMatthew Dillon 			assert(state->flags & DMSG_STATE_INSERTED);
23010c3a8cd0SMatthew Dillon 			if (state->rxcmd & DMSGF_REPLY) {
23020c3a8cd0SMatthew Dillon 				assert(msg->any.head.cmd & DMSGF_REPLY);
23030c3a8cd0SMatthew Dillon 				RB_REMOVE(dmsg_state_tree,
23041b8eded1SMatthew Dillon 					  &iocom->statewr_tree, state);
23050c3a8cd0SMatthew Dillon 			} else {
23060c3a8cd0SMatthew Dillon 				assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
23070c3a8cd0SMatthew Dillon 				RB_REMOVE(dmsg_state_tree,
23081b8eded1SMatthew Dillon 					  &iocom->staterd_tree, state);
23091b8eded1SMatthew Dillon 			}
23101b8eded1SMatthew Dillon 			pstate = state->parent;
23111b8eded1SMatthew Dillon 			TAILQ_REMOVE(&pstate->subq, state, entry);
2312d30cab67SMatthew Dillon 			state->flags &= ~DMSG_STATE_INSERTED;
2313d30cab67SMatthew Dillon 			state->parent = NULL;
2314323c0947SMatthew Dillon 			dmsg_state_drop(pstate);
2315d30cab67SMatthew Dillon 
2316d30cab67SMatthew Dillon 			if (state->relay) {
2317323c0947SMatthew Dillon 				dmsg_state_drop(state->relay);
2318d30cab67SMatthew Dillon 				state->relay = NULL;
2319d30cab67SMatthew Dillon 			}
23201b8eded1SMatthew Dillon 			dmsg_msg_free(msg);
2321323c0947SMatthew Dillon 			dmsg_state_drop(state);
23220c3a8cd0SMatthew Dillon 		} else {
23231b8eded1SMatthew Dillon 			dmsg_msg_free(msg);
23240c3a8cd0SMatthew Dillon 		}
23250c3a8cd0SMatthew Dillon 		pthread_mutex_unlock(&iocom->mtx);
23261b8eded1SMatthew Dillon 	} else {
23270c3a8cd0SMatthew Dillon 		/*
23280c3a8cd0SMatthew Dillon 		 * Message not terminating transaction, leave state intact
23290c3a8cd0SMatthew Dillon 		 * and free message if it isn't the CREATE message.
23300c3a8cd0SMatthew Dillon 		 */
23310c3a8cd0SMatthew Dillon 		dmsg_msg_free(msg);
23320c3a8cd0SMatthew Dillon 	}
23330c3a8cd0SMatthew Dillon }
23340c3a8cd0SMatthew Dillon 
2335323c0947SMatthew Dillon /*
2336323c0947SMatthew Dillon  * Clean up the state after pulling out needed fields and queueing the
2337323c0947SMatthew Dillon  * message for transmission.   This occurs in dmsg_msg_write().
2338323c0947SMatthew Dillon  */
23390c3a8cd0SMatthew Dillon static void
23401b8eded1SMatthew Dillon dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
23410c3a8cd0SMatthew Dillon {
23420c3a8cd0SMatthew Dillon 	dmsg_state_t *state;
23431b8eded1SMatthew Dillon 	dmsg_state_t *pstate;
23440c3a8cd0SMatthew Dillon 
23451b8eded1SMatthew Dillon 	assert(iocom == msg->state->iocom);
23461b8eded1SMatthew Dillon 	state = msg->state;
2347d30cab67SMatthew Dillon 	if (state->flags & DMSG_STATE_ROOT) {
2348323c0947SMatthew Dillon 		;
23490c3a8cd0SMatthew Dillon 	} else if (msg->any.head.cmd & DMSGF_DELETE) {
2350323c0947SMatthew Dillon 		/*
2351323c0947SMatthew Dillon 		 * Message terminating transaction, destroy the related
2352323c0947SMatthew Dillon 		 * state, the original message, and this message (if it
2353323c0947SMatthew Dillon 		 * isn't the original message due to a CREATE|DELETE).
2354323c0947SMatthew Dillon 		 *
2355323c0947SMatthew Dillon 		 * It's possible for governing state to terminate while
2356323c0947SMatthew Dillon 		 * sub-transactions still exist.  This is allowed but
2357323c0947SMatthew Dillon 		 * will cause sub-transactions to recursively fail.
2358323c0947SMatthew Dillon 		 * Further reception of sub-transaction messages will be
2359323c0947SMatthew Dillon 		 * impossible because the circuit will no longer exist.
2360323c0947SMatthew Dillon 		 * (XXX need code to make sure that happens properly).
2361323c0947SMatthew Dillon 		 */
23620c3a8cd0SMatthew Dillon 		pthread_mutex_lock(&iocom->mtx);
23630d20ec8aSMatthew Dillon 		assert((state->txcmd & DMSGF_DELETE) == 0);
23640c3a8cd0SMatthew Dillon 		state->txcmd |= DMSGF_DELETE;
23650c3a8cd0SMatthew Dillon 		if (state->rxcmd & DMSGF_DELETE) {
23660c3a8cd0SMatthew Dillon 			assert(state->flags & DMSG_STATE_INSERTED);
23670c3a8cd0SMatthew Dillon 			if (state->txcmd & DMSGF_REPLY) {
23680c3a8cd0SMatthew Dillon 				assert(msg->any.head.cmd & DMSGF_REPLY);
23690c3a8cd0SMatthew Dillon 				RB_REMOVE(dmsg_state_tree,
23701b8eded1SMatthew Dillon 					  &iocom->staterd_tree, state);
23710c3a8cd0SMatthew Dillon 			} else {
23720c3a8cd0SMatthew Dillon 				assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
23730c3a8cd0SMatthew Dillon 				RB_REMOVE(dmsg_state_tree,
23741b8eded1SMatthew Dillon 					  &iocom->statewr_tree, state);
23751b8eded1SMatthew Dillon 			}
23761b8eded1SMatthew Dillon 			pstate = state->parent;
23771b8eded1SMatthew Dillon 			TAILQ_REMOVE(&pstate->subq, state, entry);
2378d30cab67SMatthew Dillon 			state->flags &= ~DMSG_STATE_INSERTED;
2379d30cab67SMatthew Dillon 			state->parent = NULL;
2380323c0947SMatthew Dillon 			dmsg_state_drop(pstate);
2381d30cab67SMatthew Dillon 
2382d30cab67SMatthew Dillon 			if (state->relay) {
2383323c0947SMatthew Dillon 				dmsg_state_drop(state->relay);
2384d30cab67SMatthew Dillon 				state->relay = NULL;
2385d30cab67SMatthew Dillon 			}
2386323c0947SMatthew Dillon 			dmsg_state_drop(state);	/* usually the last drop */
23870c3a8cd0SMatthew Dillon 		}
23880c3a8cd0SMatthew Dillon 		pthread_mutex_unlock(&iocom->mtx);
23890c3a8cd0SMatthew Dillon 	}
23900c3a8cd0SMatthew Dillon }
23910c3a8cd0SMatthew Dillon 
23920c3a8cd0SMatthew Dillon /*
2393323c0947SMatthew Dillon  * Called with or without locks
2394323c0947SMatthew Dillon  */
2395323c0947SMatthew Dillon void
2396323c0947SMatthew Dillon dmsg_state_hold(dmsg_state_t *state)
2397323c0947SMatthew Dillon {
2398323c0947SMatthew Dillon 	atomic_add_int(&state->refs, 1);
2399323c0947SMatthew Dillon }
2400323c0947SMatthew Dillon 
2401323c0947SMatthew Dillon void
2402323c0947SMatthew Dillon dmsg_state_drop(dmsg_state_t *state)
2403323c0947SMatthew Dillon {
2404323c0947SMatthew Dillon 	if (atomic_fetchadd_int(&state->refs, -1) == 1)
2405323c0947SMatthew Dillon 		dmsg_state_free(state);
2406323c0947SMatthew Dillon }
2407323c0947SMatthew Dillon 
2408323c0947SMatthew Dillon /*
24090c3a8cd0SMatthew Dillon  * Called with iocom locked
24100c3a8cd0SMatthew Dillon  */
2411323c0947SMatthew Dillon static void
24120c3a8cd0SMatthew Dillon dmsg_state_free(dmsg_state_t *state)
24130c3a8cd0SMatthew Dillon {
2414323c0947SMatthew Dillon 	atomic_add_int(&dmsg_state_count, -1);
24150c3a8cd0SMatthew Dillon 	if (DMsgDebugOpt) {
24160c3a8cd0SMatthew Dillon 		fprintf(stderr, "terminate state %p id=%08x\n",
24170c3a8cd0SMatthew Dillon 			state, (uint32_t)state->msgid);
24180c3a8cd0SMatthew Dillon 	}
2419323c0947SMatthew Dillon 	assert((state->flags & (DMSG_STATE_ROOT | DMSG_STATE_INSERTED)) == 0);
2420323c0947SMatthew Dillon 	assert(TAILQ_EMPTY(&state->subq));
2421323c0947SMatthew Dillon 	assert(state->refs == 0);
2422f306de83SMatthew Dillon 	if (state->any.any != NULL)   /* XXX avoid deadlock w/exit & kernel */
2423f306de83SMatthew Dillon 		closefrom(3);
24240c3a8cd0SMatthew Dillon 	assert(state->any.any == NULL);
24250c3a8cd0SMatthew Dillon 	free(state);
24260d20ec8aSMatthew Dillon }
24270c3a8cd0SMatthew Dillon 
24280c3a8cd0SMatthew Dillon /*
24290c3a8cd0SMatthew Dillon  * This swaps endian for a hammer2_msg_hdr.  Note that the extended
24300c3a8cd0SMatthew Dillon  * header is not adjusted, just the core header.
24310c3a8cd0SMatthew Dillon  */
24320c3a8cd0SMatthew Dillon void
24330c3a8cd0SMatthew Dillon dmsg_bswap_head(dmsg_hdr_t *head)
24340c3a8cd0SMatthew Dillon {
24350c3a8cd0SMatthew Dillon 	head->magic	= bswap16(head->magic);
24360c3a8cd0SMatthew Dillon 	head->reserved02 = bswap16(head->reserved02);
24370c3a8cd0SMatthew Dillon 	head->salt	= bswap32(head->salt);
24380c3a8cd0SMatthew Dillon 
24390c3a8cd0SMatthew Dillon 	head->msgid	= bswap64(head->msgid);
24400d20ec8aSMatthew Dillon 	head->circuit	= bswap64(head->circuit);
24410d20ec8aSMatthew Dillon 	head->reserved18= bswap64(head->reserved18);
24420c3a8cd0SMatthew Dillon 
24430c3a8cd0SMatthew Dillon 	head->cmd	= bswap32(head->cmd);
24440c3a8cd0SMatthew Dillon 	head->aux_crc	= bswap32(head->aux_crc);
24450c3a8cd0SMatthew Dillon 	head->aux_bytes	= bswap32(head->aux_bytes);
24460c3a8cd0SMatthew Dillon 	head->error	= bswap32(head->error);
24470c3a8cd0SMatthew Dillon 	head->aux_descr = bswap64(head->aux_descr);
24480c3a8cd0SMatthew Dillon 	head->reserved38= bswap32(head->reserved38);
24490c3a8cd0SMatthew Dillon 	head->hdr_crc	= bswap32(head->hdr_crc);
24500c3a8cd0SMatthew Dillon }
2451