1 /* Copyright (c) 2017-2018 Dovecot authors, see the included COPYING file */
2
3 #include "lib.h"
4 #include "ioloop.h"
5 #include "array.h"
6 #include "istream-private.h"
7 #include "istream-multiplex.h"
8
9 /* all multiplex packets are [1 byte cid][4 byte length][data] */
10
11 struct multiplex_istream;
12
13 struct multiplex_ichannel {
14 struct istream_private istream;
15 struct multiplex_istream *mstream;
16 uint8_t cid;
17 size_t pending_pos;
18 bool closed:1;
19 };
20
21 struct multiplex_istream {
22 struct istream *parent;
23
24 /* channel 0 is main channel */
25 uint8_t cur_channel;
26 unsigned int remain;
27 size_t bufsize;
28 ARRAY(struct multiplex_ichannel *) channels;
29
30 bool blocking:1;
31 };
32
33 static ssize_t i_stream_multiplex_ichannel_read(struct istream_private *stream);
34
35 static struct multiplex_ichannel *
get_channel(struct multiplex_istream * mstream,uint8_t cid)36 get_channel(struct multiplex_istream *mstream, uint8_t cid)
37 {
38 struct multiplex_ichannel *channel;
39 i_assert(mstream != NULL);
40 array_foreach_elem(&mstream->channels, channel) {
41 if (channel != NULL && channel->cid == cid)
42 return channel;
43 }
44 return NULL;
45 }
46
propagate_error(struct multiplex_istream * mstream,int stream_errno)47 static void propagate_error(struct multiplex_istream *mstream, int stream_errno)
48 {
49 struct multiplex_ichannel *channel;
50 array_foreach_elem(&mstream->channels, channel)
51 if (channel != NULL)
52 channel->istream.istream.stream_errno = stream_errno;
53 }
54
propagate_eof(struct multiplex_istream * mstream)55 static void propagate_eof(struct multiplex_istream *mstream)
56 {
57 struct multiplex_ichannel *channel;
58 array_foreach_elem(&mstream->channels, channel) {
59 if (channel == NULL)
60 continue;
61
62 channel->istream.istream.eof = TRUE;
63 if (mstream->remain > 0) {
64 channel->istream.istream.stream_errno = EPIPE;
65 io_stream_set_error(&channel->istream.iostream,
66 "Unexpected EOF - %u bytes remaining in packet",
67 mstream->remain);
68 }
69 }
70 }
71
72 static ssize_t
i_stream_multiplex_read(struct multiplex_istream * mstream,struct multiplex_ichannel * req_channel)73 i_stream_multiplex_read(struct multiplex_istream *mstream,
74 struct multiplex_ichannel *req_channel)
75 {
76 const unsigned char *data;
77 size_t len = 0, used, wanted, avail;
78 ssize_t ret, got = 0;
79
80 if (mstream->parent == NULL) {
81 req_channel->istream.istream.eof = TRUE;
82 return -1;
83 }
84
85 (void)i_stream_get_data(mstream->parent, &len);
86
87 if (len == 0 && mstream->parent->closed) {
88 req_channel->istream.istream.eof = TRUE;
89 return -1;
90 }
91
92 if (((mstream->remain > 0 && len == 0) ||
93 (mstream->remain == 0 && len < 5)) &&
94 (ret = i_stream_read_memarea(mstream->parent)) <= 0) {
95 propagate_error(mstream, mstream->parent->stream_errno);
96 if (mstream->parent->eof)
97 propagate_eof(mstream);
98 return ret;
99 }
100
101 for(;;) {
102 data = i_stream_get_data(mstream->parent, &len);
103 if (len == 0) {
104 if (got == 0 && mstream->blocking) {
105 /* can't return 0 with blocking istreams,
106 so try again from the beginning. */
107 return i_stream_multiplex_read(mstream, req_channel);
108 }
109 break;
110 }
111
112 if (mstream->remain > 0) {
113 struct multiplex_ichannel *channel =
114 get_channel(mstream, mstream->cur_channel);
115 wanted = I_MIN(len, mstream->remain);
116 /* is it open? */
117 if (channel != NULL && !channel->closed) {
118 struct istream_private *stream = &channel->istream;
119 stream->pos += channel->pending_pos;
120 bool alloc_ret = i_stream_try_alloc(stream, wanted, &avail);
121 stream->pos -= channel->pending_pos;
122 if (!alloc_ret) {
123 i_stream_set_input_pending(&stream->istream, TRUE);
124 if (channel->cid != req_channel->cid)
125 return 0;
126 if (got > 0)
127 break;
128 return -2;
129 }
130
131 used = I_MIN(wanted, avail);
132
133 /* dump into buffer */
134 if (channel->cid != req_channel->cid) {
135 i_assert(stream->pos + channel->pending_pos + used <= stream->buffer_size);
136 memcpy(stream->w_buffer + stream->pos + channel->pending_pos,
137 data, used);
138 channel->pending_pos += used;
139 i_stream_set_input_pending(&stream->istream, TRUE);
140 } else {
141 i_assert(stream->pos + used <= stream->buffer_size);
142 memcpy(stream->w_buffer + stream->pos, data, used);
143 stream->pos += used;
144 got += used;
145 }
146 } else {
147 used = wanted;
148 }
149 mstream->remain -= used;
150 i_stream_skip(mstream->parent, used);
151 /* see if there is more to read */
152 continue;
153 }
154 if (mstream->remain == 0) {
155 /* need more data */
156 if (len < 5) {
157 ret = i_stream_multiplex_ichannel_read(&req_channel->istream);
158 if (ret > 0)
159 got += ret;
160 break;
161 }
162 /* channel ID */
163 mstream->cur_channel = data[0];
164 /* data length */
165 mstream->remain = be32_to_cpu_unaligned(data+1);
166 i_stream_skip(mstream->parent, 5);
167 }
168 }
169
170 propagate_error(mstream, mstream->parent->stream_errno);
171 if (mstream->parent->eof)
172 propagate_eof(mstream);
173
174 return got;
175 }
176
i_stream_multiplex_ichannel_read(struct istream_private * stream)177 static ssize_t i_stream_multiplex_ichannel_read(struct istream_private *stream)
178 {
179 struct multiplex_ichannel *channel =
180 container_of(stream, struct multiplex_ichannel, istream);
181 /* if previous multiplex read dumped data for us
182 actually serve it here. */
183 if (channel->pending_pos > 0) {
184 ssize_t ret = channel->pending_pos;
185 stream->pos += channel->pending_pos;
186 channel->pending_pos = 0;
187 return ret;
188 }
189 return i_stream_multiplex_read(channel->mstream, channel);
190 }
191
192 static void
i_stream_multiplex_ichannel_switch_ioloop_to(struct istream_private * stream,struct ioloop * ioloop)193 i_stream_multiplex_ichannel_switch_ioloop_to(struct istream_private *stream,
194 struct ioloop *ioloop)
195 {
196 struct multiplex_ichannel *channel =
197 container_of(stream, struct multiplex_ichannel, istream);
198
199 i_stream_switch_ioloop_to(channel->mstream->parent, ioloop);
200 }
201
202 static void
i_stream_multiplex_ichannel_close(struct iostream_private * stream,bool close_parent)203 i_stream_multiplex_ichannel_close(struct iostream_private *stream, bool close_parent)
204 {
205 struct multiplex_ichannel *arr_channel;
206 struct multiplex_ichannel *channel =
207 container_of(stream, struct multiplex_ichannel,
208 istream.iostream);
209 channel->closed = TRUE;
210 if (close_parent) {
211 array_foreach_elem(&channel->mstream->channels, arr_channel)
212 if (arr_channel != NULL && !arr_channel->closed)
213 return;
214 i_stream_close(channel->mstream->parent);
215 }
216 }
217
i_stream_multiplex_try_destroy(struct multiplex_istream * mstream)218 static void i_stream_multiplex_try_destroy(struct multiplex_istream *mstream)
219 {
220 struct multiplex_ichannel *channel;
221 /* can't do anything until they are all closed */
222 array_foreach_elem(&mstream->channels, channel)
223 if (channel != NULL)
224 return;
225 i_stream_unref(&mstream->parent);
226 array_free(&mstream->channels);
227 i_free(mstream);
228 }
229
i_stream_multiplex_ichannel_destroy(struct iostream_private * stream)230 static void i_stream_multiplex_ichannel_destroy(struct iostream_private *stream)
231 {
232 struct multiplex_ichannel **channelp;
233 struct multiplex_ichannel *channel =
234 container_of(stream, struct multiplex_ichannel,
235 istream.iostream);
236 i_stream_multiplex_ichannel_close(stream, TRUE);
237 i_stream_free_buffer(&channel->istream);
238 array_foreach_modifiable(&channel->mstream->channels, channelp) {
239 if (*channelp == channel) {
240 *channelp = NULL;
241 break;
242 }
243 }
244 i_stream_multiplex_try_destroy(channel->mstream);
245 }
246
247 static struct istream *
i_stream_add_channel_real(struct multiplex_istream * mstream,uint8_t cid)248 i_stream_add_channel_real(struct multiplex_istream *mstream, uint8_t cid)
249 {
250 struct multiplex_ichannel *channel = i_new(struct multiplex_ichannel, 1);
251 channel->cid = cid;
252 channel->mstream = mstream;
253 channel->istream.read = i_stream_multiplex_ichannel_read;
254 channel->istream.switch_ioloop_to = i_stream_multiplex_ichannel_switch_ioloop_to;
255 channel->istream.iostream.close = i_stream_multiplex_ichannel_close;
256 channel->istream.iostream.destroy = i_stream_multiplex_ichannel_destroy;
257 channel->istream.max_buffer_size = mstream->bufsize;
258 channel->istream.istream.blocking = mstream->blocking;
259 if (cid == 0)
260 channel->istream.fd = i_stream_get_fd(mstream->parent);
261 else
262 channel->istream.fd = -1;
263 array_push_back(&channel->mstream->channels, &channel);
264
265 return i_stream_create(&channel->istream, NULL, channel->istream.fd, 0);
266 }
267
i_stream_multiplex_add_channel(struct istream * stream,uint8_t cid)268 struct istream *i_stream_multiplex_add_channel(struct istream *stream, uint8_t cid)
269 {
270 struct multiplex_ichannel *chan =
271 container_of(stream->real_stream,
272 struct multiplex_ichannel, istream);
273 i_assert(get_channel(chan->mstream, cid) == NULL);
274
275 return i_stream_add_channel_real(chan->mstream, cid);
276 }
277
i_stream_create_multiplex(struct istream * parent,size_t bufsize)278 struct istream *i_stream_create_multiplex(struct istream *parent, size_t bufsize)
279 {
280 struct multiplex_istream *mstream;
281
282 mstream = i_new(struct multiplex_istream, 1);
283 mstream->parent = parent;
284 mstream->bufsize = bufsize;
285 mstream->blocking = parent->blocking;
286 i_array_init(&mstream->channels, 8);
287 i_stream_ref(parent);
288
289 return i_stream_add_channel_real(mstream, 0);
290 }
291
i_stream_multiplex_get_channel_id(struct istream * stream)292 uint8_t i_stream_multiplex_get_channel_id(struct istream *stream)
293 {
294 struct multiplex_ichannel *channel =
295 container_of(stream->real_stream,
296 struct multiplex_ichannel, istream);
297 return channel->cid;
298 }
299