1 /*-
2 * Copyright (c) 2000-2005 MAEKAWA Masahide <maekawa@cvsync.org>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * 3. Neither the name of the author nor the names of its contributors
14 * may be used to endorse or promote products derived from this software
15 * without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27 * SUCH DAMAGE.
28 */
29
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <sys/uio.h>
33
34 #include <errno.h>
35 #include <pthread.h>
36 #include <string.h>
37
38 #include <zlib.h>
39
40 #include "compat_stdbool.h"
41 #include "compat_stdint.h"
42 #include "compat_inttypes.h"
43 #include "basedef.h"
44
45 #include "logmsg.h"
46 #include "mux.h"
47 #include "mux_zlib.h"
48 #include "network.h"
49
50 #include "receiver.h"
51
52 bool
receiver_data_zlib(struct mux * mx,uint8_t chnum)53 receiver_data_zlib(struct mux *mx, uint8_t chnum)
54 {
55 struct muxbuf *mxb = &mx->mx_buffer[MUX_IN][chnum];
56 struct mux_stream_zlib *stream = mx->mx_stream;
57 z_stream *z = &stream->ms_zstream_in;
58 uint16_t mss;
59 uint8_t *cmd = mx->mx_recvcmd;
60 size_t len, len1, len2, tail;
61 int err, zflag;
62
63 if (!sock_recv(mx->mx_socket, cmd, MUX_CMDLEN_DATA - 2)) {
64 logmsg_err("Receiver(DATA) Error: recv");
65 return (false);
66 }
67
68 mss = GetWord(cmd);
69 if ((mss == 0) || (mss > mxb->mxb_mss)) {
70 logmsg_err("Receiver(DATA) Error: invalid length: %u", mss);
71 return (false);
72 }
73
74 if (!sock_recv(mx->mx_socket, stream->ms_zbuffer_in, (size_t)mss)) {
75 logmsg_err("Receiver(DATA) Error: recv");
76 return (false);
77 }
78
79 z->next_in = stream->ms_zbuffer_in;
80 z->avail_in = mss;
81
82 do {
83 if ((err = pthread_mutex_lock(&mxb->mxb_lock)) != 0) {
84 logmsg_err("Receiver(DATA) Error: mutex lock: %s",
85 strerror(err));
86 return (false);
87 }
88 if (mxb->mxb_state != MUX_STATE_RUNNING) {
89 logmsg_err("Receiver(DATA) Error: not running: %u",
90 chnum);
91 mxb->mxb_state = MUX_STATE_ERROR;
92 pthread_mutex_unlock(&mxb->mxb_lock);
93 return (false);
94 }
95
96 while ((len = mxb->mxb_bufsize - mxb->mxb_length) == 0) {
97 logmsg_debug(DEBUG_BASE, "Receriver: Sleep(%u): %u %u",
98 chnum, mxb->mxb_length, mxb->mxb_bufsize);
99 if ((err = pthread_cond_wait(&mxb->mxb_wait_in,
100 &mxb->mxb_lock)) != 0) {
101 logmsg_err("Receiver(DATA) Error: cond_wait: "
102 "%s", strerror(err));
103 mxb->mxb_state = MUX_STATE_ERROR;
104 pthread_mutex_unlock(&mxb->mxb_lock);
105 return (false);
106 }
107 logmsg_debug(DEBUG_BASE, "Receriver: Wakeup(%u): %u",
108 chnum, mxb->mxb_length);
109 if (mxb->mxb_state != MUX_STATE_RUNNING) {
110 logmsg_err("Receiver(DATA) Error: "
111 "not running: %u", chnum);
112 mxb->mxb_state = MUX_STATE_ERROR;
113 pthread_mutex_unlock(&mxb->mxb_lock);
114 return (false);
115 }
116 }
117
118 tail = mxb->mxb_head + mxb->mxb_length;
119 if (tail >= mxb->mxb_bufsize)
120 tail -= mxb->mxb_bufsize;
121 if ((len1 = tail + len) > mxb->mxb_bufsize) {
122 len2 = len1 - mxb->mxb_bufsize;
123 len1 = len - len2;
124 zflag = 0;
125 } else {
126 len2 = 0;
127 zflag = Z_FINISH;
128 }
129
130 z->next_out = &mxb->mxb_buffer[tail];
131 z->avail_out = len1;
132 err = inflate(z, zflag);
133 if ((err != Z_STREAM_END) && (err != Z_OK)) {
134 logmsg_err("Receiver(DATA) Error: INFLATE: %s",
135 z->msg);
136 mxb->mxb_state = MUX_STATE_ERROR;
137 pthread_mutex_unlock(&mxb->mxb_lock);
138 return (false);
139 }
140
141 if ((z->avail_in != 0) && (len2 > 0)) {
142 z->next_out = mxb->mxb_buffer;
143 z->avail_out = len2;
144 err = inflate(z, Z_FINISH);
145 if ((err != Z_STREAM_END) && (err != Z_OK)) {
146 logmsg_err("Receiver(DATA) Error: INFLATE: %s",
147 z->msg);
148 mxb->mxb_state = MUX_STATE_ERROR;
149 pthread_mutex_unlock(&mxb->mxb_lock);
150 return (false);
151 }
152 }
153
154 mx->mx_xfer_in += z->total_out;
155 mxb->mxb_length += z->total_out;
156 z->total_out = 0;
157
158 if ((err = pthread_cond_signal(&mxb->mxb_wait_out)) != 0) {
159 logmsg_err("Receiver(DATA) Error: cond signal: %s",
160 strerror(err));
161 mxb->mxb_state = MUX_STATE_ERROR;
162 pthread_mutex_unlock(&mxb->mxb_lock);
163 return (false);
164 }
165
166 if ((err = pthread_mutex_unlock(&mxb->mxb_lock)) != 0) {
167 logmsg_err("Receiver(DATA) Error: mutex unlock: %s",
168 strerror(err));
169 return(false);
170 }
171 } while (z->avail_in > 0);
172
173 if (inflateReset(z) != Z_OK) {
174 logmsg_err("Receiver(DATA) Error: INFLATE(reset): %s", z->msg);
175 return (false);
176 }
177
178 return (true);
179 }
180