1 /*-
2 * Copyright (c) 2000-2005 MAEKAWA Masahide <maekawa@cvsync.org>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * 3. Neither the name of the author nor the names of its contributors
14 * may be used to endorse or promote products derived from this software
15 * without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27 * SUCH DAMAGE.
28 */
29
30 #include <sys/types.h>
31 #include <sys/socket.h>
32
33 #include <limits.h>
34 #include <pthread.h>
35 #include <string.h>
36
37 #include "compat_stdbool.h"
38 #include "compat_stdint.h"
39 #include "compat_inttypes.h"
40 #include "compat_limits.h"
41 #include "basedef.h"
42
43 #include "cvsync.h"
44 #include "logmsg.h"
45 #include "mux.h"
46 #include "network.h"
47
48 #include "receiver.h"
49
50 void *
receiver(void * arg)51 receiver(void *arg)
52 {
53 struct mux *mx = (struct mux *)arg;
54 uint8_t *cmd = mx->mx_recvcmd, chnum;
55 int err;
56
57 for (;;) {
58 if ((err = pthread_mutex_lock(&mx->mx_lock)) != 0) {
59 logmsg_err("Receiver Error: mutex lock: %s",
60 strerror(err));
61 mux_abort(mx);
62 return (CVSYNC_THREAD_FAILURE);
63 }
64 if (!mx->mx_isconnected) {
65 logmsg_err("Receiver Error: socket");
66 pthread_mutex_unlock(&mx->mx_lock);
67 mux_abort(mx);
68 return (CVSYNC_THREAD_FAILURE);
69 }
70
71 if (mx->mx_state[MUX_IN][0] && mx->mx_state[MUX_IN][1])
72 break;
73
74 if ((err = pthread_mutex_unlock(&mx->mx_lock)) != 0) {
75 logmsg_err("Receiver Error: mutex unlock: %s",
76 strerror(err));
77 mux_abort(mx);
78 return (CVSYNC_THREAD_FAILURE);
79 }
80
81 if (!sock_recv(mx->mx_socket, cmd, 2)) {
82 logmsg_err("Receiver Error: recv");
83 mux_abort(mx);
84 return (CVSYNC_THREAD_FAILURE);
85 }
86 chnum = cmd[1];
87 if ((chnum != 0) && (chnum != 1)) {
88 logmsg_err("Receiver Error: invalid channel: %u",
89 chnum);
90 mux_abort(mx);
91 return (CVSYNC_THREAD_FAILURE);
92 }
93
94 switch (cmd[0]) {
95 case MUX_CMD_CLOSE:
96 if (!receiver_close(mx, chnum)) {
97 mux_abort(mx);
98 return (CVSYNC_THREAD_FAILURE);
99 }
100 break;
101 case MUX_CMD_DATA:
102 switch (mx->mx_compress) {
103 case CVSYNC_COMPRESS_NO:
104 if (!receiver_data_raw(mx, chnum)) {
105 mux_abort(mx);
106 return (CVSYNC_THREAD_FAILURE);
107 }
108 break;
109 case CVSYNC_COMPRESS_ZLIB:
110 if (!receiver_data_zlib(mx, chnum)) {
111 mux_abort(mx);
112 return (CVSYNC_THREAD_FAILURE);
113 }
114 break;
115 default:
116 logmsg_err("Receiver Error: unknown "
117 "compression type: %d",
118 mx->mx_compress);
119 mux_abort(mx);
120 return (CVSYNC_THREAD_FAILURE);
121 }
122 break;
123 case MUX_CMD_RESET:
124 if (!receiver_reset(mx, chnum)) {
125 mux_abort(mx);
126 return (CVSYNC_THREAD_FAILURE);
127 }
128 break;
129 default:
130 logmsg_err("Receiver Error: unknown command: %02x",
131 cmd[0]);
132 mux_abort(mx);
133 return (CVSYNC_THREAD_FAILURE);
134 }
135 }
136
137 while (!mx->mx_state[MUX_OUT][0] || !mx->mx_state[MUX_OUT][1]) {
138 logmsg_debug(DEBUG_BASE, "Receiver: Sleep: %u %u",
139 mx->mx_state[MUX_OUT][0],
140 mx->mx_state[MUX_OUT][1]);
141 if ((err = pthread_cond_wait(&mx->mx_wait,
142 &mx->mx_lock)) != 0) {
143 logmsg_err("Receiver Error: cond wait: %s",
144 strerror(err));
145 pthread_mutex_unlock(&mx->mx_lock);
146 mux_abort(mx);
147 return (CVSYNC_THREAD_FAILURE);
148 }
149 logmsg_debug(DEBUG_BASE, "Receiver: Wakeup: %u %u",
150 mx->mx_state[MUX_OUT][0],
151 mx->mx_state[MUX_OUT][1]);
152 if (!mx->mx_isconnected) {
153 logmsg_err("Receiver Error: socket");
154 pthread_mutex_unlock(&mx->mx_lock);
155 mux_abort(mx);
156 return (CVSYNC_THREAD_FAILURE);
157 }
158 }
159
160 if ((err = pthread_mutex_unlock(&mx->mx_lock)) != 0) {
161 logmsg_err("Receiver Error: mutex unlock: %s", strerror(err));
162 mux_abort(mx);
163 return (CVSYNC_THREAD_FAILURE);
164 }
165
166 return (CVSYNC_THREAD_SUCCESS);
167 }
168
169 bool
receiver_close(struct mux * mx,uint8_t chnum)170 receiver_close(struct mux *mx, uint8_t chnum)
171 {
172 struct muxbuf *mxb = &mx->mx_buffer[MUX_OUT][chnum];
173 int err;
174
175 if ((err = pthread_mutex_lock(&mxb->mxb_lock)) != 0) {
176 logmsg_err("Receiver(CLOSE) Error: mutex lock: %s",
177 strerror(err));
178 return (false);
179 }
180 if (mxb->mxb_state != MUX_STATE_RUNNING) {
181 logmsg_err("Receiver(CLOSE) Error: not running: %u", chnum);
182 mxb->mxb_state = MUX_STATE_ERROR;
183 pthread_mutex_unlock(&mxb->mxb_lock);
184 return (false);
185 }
186
187 if ((mxb->mxb_length != 0) || (mxb->mxb_rlength != 0)) {
188 logmsg_err("Receiver(CLOSE) Error: work in progress");
189 mxb->mxb_state = MUX_STATE_ERROR;
190 pthread_mutex_unlock(&mxb->mxb_lock);
191 return (false);
192 }
193 mxb->mxb_state = MUX_STATE_CLOSED;
194
195 if ((err = pthread_cond_signal(&mxb->mxb_wait_in)) != 0) {
196 logmsg_err("Receiver(CLOSE) Error: cond signal: %s",
197 strerror(err));
198 mxb->mxb_state = MUX_STATE_ERROR;
199 pthread_mutex_unlock(&mxb->mxb_lock);
200 return (false);
201 }
202
203 if ((err = pthread_mutex_unlock(&mxb->mxb_lock)) != 0) {
204 logmsg_err("Receiver(CLOSE) Error: mutex unlock: %s",
205 strerror(err));
206 return (false);
207 }
208
209 if ((err = pthread_mutex_lock(&mx->mx_lock)) != 0) {
210 logmsg_err("Receiver(CLOSE) Error: mutex lock: %s",
211 strerror(err));
212 return (false);
213 }
214 if (mx->mx_state[MUX_IN][chnum]) {
215 logmsg_err("Receiver(CLOSE) Error: not active: %u", chnum);
216 pthread_mutex_unlock(&mx->mx_lock);
217 return (false);
218 }
219 mx->mx_state[MUX_IN][chnum] = true;
220 if ((err = pthread_mutex_unlock(&mx->mx_lock)) != 0) {
221 logmsg_err("Receiver(CLOSE) Error: mutex unlock: %s",
222 strerror(err));
223 return (false);
224 }
225
226 return (true);
227 }
228
229 bool
receiver_reset(struct mux * mx,uint8_t chnum)230 receiver_reset(struct mux *mx, uint8_t chnum)
231 {
232 struct muxbuf *mxb = &mx->mx_buffer[MUX_OUT][chnum];
233 uint32_t len;
234 uint8_t *cmd = mx->mx_recvcmd;
235 int err;
236
237 if (!sock_recv(mx->mx_socket, cmd, MUX_CMDLEN_RESET - 2)) {
238 logmsg_err("Receiver(RESET) Error: recv");
239 return (false);
240 }
241 if ((len = GetDWord(cmd)) == 0) {
242 logmsg_err("Receiver(RESET) Error: invalid length: %u", len);
243 return (false);
244 }
245
246 if ((err = pthread_mutex_lock(&mxb->mxb_lock)) != 0) {
247 logmsg_err("Receiver(RESET) Error: mutex lock: %s",
248 strerror(err));
249 return (false);
250 }
251 if (mxb->mxb_state != MUX_STATE_RUNNING) {
252 logmsg_err("Receiver(RESET) Error: not running: %u", chnum);
253 mxb->mxb_state = MUX_STATE_ERROR;
254 pthread_mutex_unlock(&mxb->mxb_lock);
255 return (false);
256 }
257
258 if (len > mxb->mxb_rlength) {
259 logmsg_err("Receiver(RESET) Error: invalid length: %u > "
260 "%u(rlength)", len, mxb->mxb_rlength);
261 mxb->mxb_state = MUX_STATE_ERROR;
262 pthread_mutex_unlock(&mxb->mxb_lock);
263 return (false);
264 }
265
266 logmsg_debug(DEBUG_BASE, "Receiver(RESET) %u: %u -> %u", chnum,
267 mxb->mxb_rlength, mxb->mxb_rlength - len);
268
269 mxb->mxb_rlength -= len;
270
271 if ((err = pthread_cond_signal(&mxb->mxb_wait_in)) != 0) {
272 logmsg_err("Receiver(RESET) Error: cond signal: %s",
273 strerror(err));
274 mxb->mxb_state = MUX_STATE_ERROR;
275 pthread_mutex_unlock(&mxb->mxb_lock);
276 return (false);
277 }
278
279 if ((err = pthread_mutex_unlock(&mxb->mxb_lock)) != 0) {
280 logmsg_err("Receiver(RESET) Error: mutex unlock: %s",
281 strerror(err));
282 return (false);
283 }
284
285 return (true);
286 }
287