1 /*-
2  * Copyright (c) 2000-2005 MAEKAWA Masahide <maekawa@cvsync.org>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  * 3. Neither the name of the author nor the names of its contributors
14  *    may be used to endorse or promote products derived from this software
15  *    without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27  * SUCH DAMAGE.
28  */
29 
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 
33 #include <limits.h>
34 #include <pthread.h>
35 #include <string.h>
36 
37 #include "compat_stdbool.h"
38 #include "compat_stdint.h"
39 #include "compat_inttypes.h"
40 #include "compat_limits.h"
41 #include "basedef.h"
42 
43 #include "cvsync.h"
44 #include "logmsg.h"
45 #include "mux.h"
46 #include "network.h"
47 
48 #include "receiver.h"
49 
50 void *
receiver(void * arg)51 receiver(void *arg)
52 {
53 	struct mux *mx = (struct mux *)arg;
54 	uint8_t *cmd = mx->mx_recvcmd, chnum;
55 	int err;
56 
57 	for (;;) {
58 		if ((err = pthread_mutex_lock(&mx->mx_lock)) != 0) {
59 			logmsg_err("Receiver Error: mutex lock: %s",
60 				   strerror(err));
61 			mux_abort(mx);
62 			return (CVSYNC_THREAD_FAILURE);
63 		}
64 		if (!mx->mx_isconnected) {
65 			logmsg_err("Receiver Error: socket");
66 			pthread_mutex_unlock(&mx->mx_lock);
67 			mux_abort(mx);
68 			return (CVSYNC_THREAD_FAILURE);
69 		}
70 
71 		if (mx->mx_state[MUX_IN][0] && mx->mx_state[MUX_IN][1])
72 			break;
73 
74 		if ((err = pthread_mutex_unlock(&mx->mx_lock)) != 0) {
75 			logmsg_err("Receiver Error: mutex unlock: %s",
76 				   strerror(err));
77 			mux_abort(mx);
78 			return (CVSYNC_THREAD_FAILURE);
79 		}
80 
81 		if (!sock_recv(mx->mx_socket, cmd, 2)) {
82 			logmsg_err("Receiver Error: recv");
83 			mux_abort(mx);
84 			return (CVSYNC_THREAD_FAILURE);
85 		}
86 		chnum = cmd[1];
87 		if ((chnum != 0) && (chnum != 1)) {
88 			logmsg_err("Receiver Error: invalid channel: %u",
89 				   chnum);
90 			mux_abort(mx);
91 			return (CVSYNC_THREAD_FAILURE);
92 		}
93 
94 		switch (cmd[0]) {
95 		case MUX_CMD_CLOSE:
96 			if (!receiver_close(mx, chnum)) {
97 				mux_abort(mx);
98 				return (CVSYNC_THREAD_FAILURE);
99 			}
100 			break;
101 		case MUX_CMD_DATA:
102 			switch (mx->mx_compress) {
103 			case CVSYNC_COMPRESS_NO:
104 				if (!receiver_data_raw(mx, chnum)) {
105 					mux_abort(mx);
106 					return (CVSYNC_THREAD_FAILURE);
107 				}
108 				break;
109 			case CVSYNC_COMPRESS_ZLIB:
110 				if (!receiver_data_zlib(mx, chnum)) {
111 					mux_abort(mx);
112 					return (CVSYNC_THREAD_FAILURE);
113 				}
114 				break;
115 			default:
116 				logmsg_err("Receiver Error: unknown "
117 					   "compression type: %d",
118 					   mx->mx_compress);
119 				mux_abort(mx);
120 				return (CVSYNC_THREAD_FAILURE);
121 			}
122 			break;
123 		case MUX_CMD_RESET:
124 			if (!receiver_reset(mx, chnum)) {
125 				mux_abort(mx);
126 				return (CVSYNC_THREAD_FAILURE);
127 			}
128 			break;
129 		default:
130 			logmsg_err("Receiver Error: unknown command: %02x",
131 				   cmd[0]);
132 			mux_abort(mx);
133 			return (CVSYNC_THREAD_FAILURE);
134 		}
135 	}
136 
137 	while (!mx->mx_state[MUX_OUT][0] || !mx->mx_state[MUX_OUT][1]) {
138 		logmsg_debug(DEBUG_BASE, "Receiver: Sleep: %u %u",
139 			     mx->mx_state[MUX_OUT][0],
140 			     mx->mx_state[MUX_OUT][1]);
141 		if ((err = pthread_cond_wait(&mx->mx_wait,
142 					     &mx->mx_lock)) != 0) {
143 			logmsg_err("Receiver Error: cond wait: %s",
144 				   strerror(err));
145 			pthread_mutex_unlock(&mx->mx_lock);
146 			mux_abort(mx);
147 			return (CVSYNC_THREAD_FAILURE);
148 		}
149 		logmsg_debug(DEBUG_BASE, "Receiver: Wakeup: %u %u",
150 			     mx->mx_state[MUX_OUT][0],
151 			     mx->mx_state[MUX_OUT][1]);
152 		if (!mx->mx_isconnected) {
153 			logmsg_err("Receiver Error: socket");
154 			pthread_mutex_unlock(&mx->mx_lock);
155 			mux_abort(mx);
156 			return (CVSYNC_THREAD_FAILURE);
157 		}
158 	}
159 
160 	if ((err = pthread_mutex_unlock(&mx->mx_lock)) != 0) {
161 		logmsg_err("Receiver Error: mutex unlock: %s", strerror(err));
162 		mux_abort(mx);
163 		return (CVSYNC_THREAD_FAILURE);
164 	}
165 
166 	return (CVSYNC_THREAD_SUCCESS);
167 }
168 
169 bool
receiver_close(struct mux * mx,uint8_t chnum)170 receiver_close(struct mux *mx, uint8_t chnum)
171 {
172 	struct muxbuf *mxb = &mx->mx_buffer[MUX_OUT][chnum];
173 	int err;
174 
175 	if ((err = pthread_mutex_lock(&mxb->mxb_lock)) != 0) {
176 		logmsg_err("Receiver(CLOSE) Error: mutex lock: %s",
177 			   strerror(err));
178 		return (false);
179 	}
180 	if (mxb->mxb_state != MUX_STATE_RUNNING) {
181 		logmsg_err("Receiver(CLOSE) Error: not running: %u", chnum);
182 		mxb->mxb_state = MUX_STATE_ERROR;
183 		pthread_mutex_unlock(&mxb->mxb_lock);
184 		return (false);
185 	}
186 
187 	if ((mxb->mxb_length != 0) || (mxb->mxb_rlength != 0)) {
188 		logmsg_err("Receiver(CLOSE) Error: work in progress");
189 		mxb->mxb_state = MUX_STATE_ERROR;
190 		pthread_mutex_unlock(&mxb->mxb_lock);
191 		return (false);
192 	}
193 	mxb->mxb_state = MUX_STATE_CLOSED;
194 
195 	if ((err = pthread_cond_signal(&mxb->mxb_wait_in)) != 0) {
196 		logmsg_err("Receiver(CLOSE) Error: cond signal: %s",
197 			   strerror(err));
198 		mxb->mxb_state = MUX_STATE_ERROR;
199 		pthread_mutex_unlock(&mxb->mxb_lock);
200 		return (false);
201 	}
202 
203 	if ((err = pthread_mutex_unlock(&mxb->mxb_lock)) != 0) {
204 		logmsg_err("Receiver(CLOSE) Error: mutex unlock: %s",
205 			   strerror(err));
206 		return (false);
207 	}
208 
209 	if ((err = pthread_mutex_lock(&mx->mx_lock)) != 0) {
210 		logmsg_err("Receiver(CLOSE) Error: mutex lock: %s",
211 			   strerror(err));
212 		return (false);
213 	}
214 	if (mx->mx_state[MUX_IN][chnum]) {
215 		logmsg_err("Receiver(CLOSE) Error: not active: %u", chnum);
216 		pthread_mutex_unlock(&mx->mx_lock);
217 		return (false);
218 	}
219 	mx->mx_state[MUX_IN][chnum] = true;
220 	if ((err = pthread_mutex_unlock(&mx->mx_lock)) != 0) {
221 		logmsg_err("Receiver(CLOSE) Error: mutex unlock: %s",
222 			   strerror(err));
223 		return (false);
224 	}
225 
226 	return (true);
227 }
228 
229 bool
receiver_reset(struct mux * mx,uint8_t chnum)230 receiver_reset(struct mux *mx, uint8_t chnum)
231 {
232 	struct muxbuf *mxb = &mx->mx_buffer[MUX_OUT][chnum];
233 	uint32_t len;
234 	uint8_t *cmd = mx->mx_recvcmd;
235 	int err;
236 
237 	if (!sock_recv(mx->mx_socket, cmd, MUX_CMDLEN_RESET - 2)) {
238 		logmsg_err("Receiver(RESET) Error: recv");
239 		return (false);
240 	}
241 	if ((len = GetDWord(cmd)) == 0) {
242 		logmsg_err("Receiver(RESET) Error: invalid length: %u", len);
243 		return (false);
244 	}
245 
246 	if ((err = pthread_mutex_lock(&mxb->mxb_lock)) != 0) {
247 		logmsg_err("Receiver(RESET) Error: mutex lock: %s",
248 			   strerror(err));
249 		return (false);
250 	}
251 	if (mxb->mxb_state != MUX_STATE_RUNNING) {
252 		logmsg_err("Receiver(RESET) Error: not running: %u", chnum);
253 		mxb->mxb_state = MUX_STATE_ERROR;
254 		pthread_mutex_unlock(&mxb->mxb_lock);
255 		return (false);
256 	}
257 
258 	if (len > mxb->mxb_rlength) {
259 		logmsg_err("Receiver(RESET) Error: invalid length: %u > "
260 			   "%u(rlength)", len, mxb->mxb_rlength);
261 		mxb->mxb_state = MUX_STATE_ERROR;
262 		pthread_mutex_unlock(&mxb->mxb_lock);
263 		return (false);
264 	}
265 
266 	logmsg_debug(DEBUG_BASE, "Receiver(RESET) %u: %u -> %u", chnum,
267 		     mxb->mxb_rlength, mxb->mxb_rlength - len);
268 
269 	mxb->mxb_rlength -= len;
270 
271 	if ((err = pthread_cond_signal(&mxb->mxb_wait_in)) != 0) {
272 		logmsg_err("Receiver(RESET) Error: cond signal: %s",
273 			   strerror(err));
274 		mxb->mxb_state = MUX_STATE_ERROR;
275 		pthread_mutex_unlock(&mxb->mxb_lock);
276 		return (false);
277 	}
278 
279 	if ((err = pthread_mutex_unlock(&mxb->mxb_lock)) != 0) {
280 		logmsg_err("Receiver(RESET) Error: mutex unlock: %s",
281 			   strerror(err));
282 		return (false);
283 	}
284 
285 	return (true);
286 }
287