1 /*-
2  * Copyright (c) 2000-2005 MAEKAWA Masahide <maekawa@cvsync.org>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  * 3. Neither the name of the author nor the names of its contributors
14  *    may be used to endorse or promote products derived from this software
15  *    without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27  * SUCH DAMAGE.
28  */
29 
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <sys/uio.h>
33 
34 #include <errno.h>
35 #include <pthread.h>
36 #include <string.h>
37 
38 #include <zlib.h>
39 
40 #include "compat_stdbool.h"
41 #include "compat_stdint.h"
42 #include "compat_inttypes.h"
43 #include "basedef.h"
44 
45 #include "logmsg.h"
46 #include "mux.h"
47 #include "mux_zlib.h"
48 #include "network.h"
49 
50 #include "receiver.h"
51 
52 bool
receiver_data_zlib(struct mux * mx,uint8_t chnum)53 receiver_data_zlib(struct mux *mx, uint8_t chnum)
54 {
55 	struct muxbuf *mxb = &mx->mx_buffer[MUX_IN][chnum];
56 	struct mux_stream_zlib *stream = mx->mx_stream;
57 	z_stream *z = &stream->ms_zstream_in;
58 	uint16_t mss;
59 	uint8_t *cmd = mx->mx_recvcmd;
60 	size_t len, len1, len2, tail;
61 	int err, zflag;
62 
63 	if (!sock_recv(mx->mx_socket, cmd, MUX_CMDLEN_DATA - 2)) {
64 		logmsg_err("Receiver(DATA) Error: recv");
65 		return (false);
66 	}
67 
68 	mss = GetWord(cmd);
69 	if ((mss == 0) || (mss > mxb->mxb_mss)) {
70 		logmsg_err("Receiver(DATA) Error: invalid length: %u", mss);
71 		return (false);
72 	}
73 
74 	if (!sock_recv(mx->mx_socket, stream->ms_zbuffer_in, (size_t)mss)) {
75 		logmsg_err("Receiver(DATA) Error: recv");
76 		return (false);
77 	}
78 
79 	z->next_in = stream->ms_zbuffer_in;
80 	z->avail_in = mss;
81 
82 	do {
83 		if ((err = pthread_mutex_lock(&mxb->mxb_lock)) != 0) {
84 			logmsg_err("Receiver(DATA) Error: mutex lock: %s",
85 				   strerror(err));
86 			return (false);
87 		}
88 		if (mxb->mxb_state != MUX_STATE_RUNNING) {
89 			logmsg_err("Receiver(DATA) Error: not running: %u",
90 				   chnum);
91 			mxb->mxb_state = MUX_STATE_ERROR;
92 			pthread_mutex_unlock(&mxb->mxb_lock);
93 			return (false);
94 		}
95 
96 		while ((len = mxb->mxb_bufsize - mxb->mxb_length) == 0) {
97 			logmsg_debug(DEBUG_BASE, "Receriver: Sleep(%u): %u %u",
98 				     chnum, mxb->mxb_length, mxb->mxb_bufsize);
99 			if ((err = pthread_cond_wait(&mxb->mxb_wait_in,
100 						     &mxb->mxb_lock)) != 0) {
101 				logmsg_err("Receiver(DATA) Error: cond_wait: "
102 					   "%s", strerror(err));
103 				mxb->mxb_state = MUX_STATE_ERROR;
104 				pthread_mutex_unlock(&mxb->mxb_lock);
105 				return (false);
106 			}
107 			logmsg_debug(DEBUG_BASE, "Receriver: Wakeup(%u): %u",
108 				     chnum, mxb->mxb_length);
109 			if (mxb->mxb_state != MUX_STATE_RUNNING) {
110 				logmsg_err("Receiver(DATA) Error: "
111 					   "not running: %u", chnum);
112 				mxb->mxb_state = MUX_STATE_ERROR;
113 				pthread_mutex_unlock(&mxb->mxb_lock);
114 				return (false);
115 			}
116 		}
117 
118 		tail = mxb->mxb_head + mxb->mxb_length;
119 		if (tail >= mxb->mxb_bufsize)
120 			tail -= mxb->mxb_bufsize;
121 		if ((len1 = tail + len) > mxb->mxb_bufsize) {
122 			len2 = len1 - mxb->mxb_bufsize;
123 			len1 = len - len2;
124 			zflag = 0;
125 		} else {
126 			len2 = 0;
127 			zflag = Z_FINISH;
128 		}
129 
130 		z->next_out = &mxb->mxb_buffer[tail];
131 		z->avail_out = len1;
132 		err = inflate(z, zflag);
133 		if ((err != Z_STREAM_END) && (err != Z_OK)) {
134 			logmsg_err("Receiver(DATA) Error: INFLATE: %s",
135 				   z->msg);
136 			mxb->mxb_state = MUX_STATE_ERROR;
137 			pthread_mutex_unlock(&mxb->mxb_lock);
138 			return (false);
139 		}
140 
141 		if ((z->avail_in != 0) && (len2 > 0)) {
142 			z->next_out = mxb->mxb_buffer;
143 			z->avail_out = len2;
144 			err = inflate(z, Z_FINISH);
145 			if ((err != Z_STREAM_END) && (err != Z_OK)) {
146 				logmsg_err("Receiver(DATA) Error: INFLATE: %s",
147 					   z->msg);
148 				mxb->mxb_state = MUX_STATE_ERROR;
149 				pthread_mutex_unlock(&mxb->mxb_lock);
150 				return (false);
151 			}
152 		}
153 
154 		mx->mx_xfer_in += z->total_out;
155 		mxb->mxb_length += z->total_out;
156 		z->total_out = 0;
157 
158 		if ((err = pthread_cond_signal(&mxb->mxb_wait_out)) != 0) {
159 			logmsg_err("Receiver(DATA) Error: cond signal: %s",
160 				   strerror(err));
161 			mxb->mxb_state = MUX_STATE_ERROR;
162 			pthread_mutex_unlock(&mxb->mxb_lock);
163 			return (false);
164 		}
165 
166 		if ((err = pthread_mutex_unlock(&mxb->mxb_lock)) != 0) {
167 			logmsg_err("Receiver(DATA) Error: mutex unlock: %s",
168 				   strerror(err));
169 			return(false);
170 		}
171 	} while (z->avail_in > 0);
172 
173 	if (inflateReset(z) != Z_OK) {
174 		logmsg_err("Receiver(DATA) Error: INFLATE(reset): %s", z->msg);
175 		return (false);
176 	}
177 
178 	return (true);
179 }
180