1 /* Copyright (c) 2017-2018 Dovecot authors, see the included COPYING file */
2 
3 #include "lib.h"
4 #include "ioloop.h"
5 #include "array.h"
6 #include "istream-private.h"
7 #include "istream-multiplex.h"
8 
9 /* all multiplex packets are [1 byte cid][4 byte length][data] */
10 
11 struct multiplex_istream;
12 
13 struct multiplex_ichannel {
14 	struct istream_private istream;
15 	struct multiplex_istream *mstream;
16 	uint8_t cid;
17 	size_t pending_pos;
18 	bool closed:1;
19 };
20 
21 struct multiplex_istream {
22 	struct istream *parent;
23 
24 	/* channel 0 is main channel */
25 	uint8_t cur_channel;
26 	unsigned int remain;
27 	size_t bufsize;
28 	ARRAY(struct multiplex_ichannel *) channels;
29 
30 	bool blocking:1;
31 };
32 
33 static ssize_t i_stream_multiplex_ichannel_read(struct istream_private *stream);
34 
35 static struct multiplex_ichannel *
get_channel(struct multiplex_istream * mstream,uint8_t cid)36 get_channel(struct multiplex_istream *mstream, uint8_t cid)
37 {
38 	struct multiplex_ichannel *channel;
39 	i_assert(mstream != NULL);
40 	array_foreach_elem(&mstream->channels, channel) {
41 		if (channel != NULL && channel->cid == cid)
42 			return channel;
43 	}
44 	return NULL;
45 }
46 
propagate_error(struct multiplex_istream * mstream,int stream_errno)47 static void propagate_error(struct multiplex_istream *mstream, int stream_errno)
48 {
49 	struct multiplex_ichannel *channel;
50 	array_foreach_elem(&mstream->channels, channel)
51 		if (channel != NULL)
52 			channel->istream.istream.stream_errno = stream_errno;
53 }
54 
propagate_eof(struct multiplex_istream * mstream)55 static void propagate_eof(struct multiplex_istream *mstream)
56 {
57 	struct multiplex_ichannel *channel;
58 	array_foreach_elem(&mstream->channels, channel) {
59 		if (channel == NULL)
60 			continue;
61 
62 		channel->istream.istream.eof = TRUE;
63 		if (mstream->remain > 0) {
64 			channel->istream.istream.stream_errno = EPIPE;
65 			io_stream_set_error(&channel->istream.iostream,
66 				"Unexpected EOF - %u bytes remaining in packet",
67 				mstream->remain);
68 		}
69 	}
70 }
71 
72 static ssize_t
i_stream_multiplex_read(struct multiplex_istream * mstream,struct multiplex_ichannel * req_channel)73 i_stream_multiplex_read(struct multiplex_istream *mstream,
74 			struct multiplex_ichannel *req_channel)
75 {
76 	const unsigned char *data;
77 	size_t len = 0, used, wanted, avail;
78 	ssize_t ret, got = 0;
79 
80 	if (mstream->parent == NULL) {
81 		req_channel->istream.istream.eof = TRUE;
82 		return -1;
83 	}
84 
85 	(void)i_stream_get_data(mstream->parent, &len);
86 
87 	if (len == 0 && mstream->parent->closed) {
88 		req_channel->istream.istream.eof = TRUE;
89 		return -1;
90 	}
91 
92 	if (((mstream->remain > 0 && len == 0) ||
93 	     (mstream->remain == 0 && len < 5)) &&
94 	    (ret = i_stream_read_memarea(mstream->parent)) <= 0) {
95 		propagate_error(mstream, mstream->parent->stream_errno);
96 		if (mstream->parent->eof)
97 			propagate_eof(mstream);
98 		return ret;
99 	}
100 
101 	for(;;) {
102 		data = i_stream_get_data(mstream->parent, &len);
103 		if (len == 0) {
104 			if (got == 0 && mstream->blocking) {
105 				/* can't return 0 with blocking istreams,
106 				   so try again from the beginning. */
107 				return i_stream_multiplex_read(mstream, req_channel);
108 			}
109 			break;
110 		}
111 
112 		if (mstream->remain > 0) {
113 			struct multiplex_ichannel *channel =
114 				get_channel(mstream, mstream->cur_channel);
115 			wanted = I_MIN(len, mstream->remain);
116 			/* is it open? */
117 			if (channel != NULL && !channel->closed) {
118 				struct istream_private *stream = &channel->istream;
119 				stream->pos += channel->pending_pos;
120 				bool alloc_ret = i_stream_try_alloc(stream, wanted, &avail);
121 				stream->pos -= channel->pending_pos;
122 				if (!alloc_ret) {
123 					i_stream_set_input_pending(&stream->istream, TRUE);
124 					if (channel->cid != req_channel->cid)
125 						return 0;
126 					if (got > 0)
127 						break;
128 					return -2;
129 				}
130 
131 				used = I_MIN(wanted, avail);
132 
133 				/* dump into buffer */
134 				if (channel->cid != req_channel->cid) {
135 					i_assert(stream->pos + channel->pending_pos + used <= stream->buffer_size);
136 					memcpy(stream->w_buffer + stream->pos + channel->pending_pos,
137 					       data, used);
138 					channel->pending_pos += used;
139 					i_stream_set_input_pending(&stream->istream, TRUE);
140 				} else {
141 					i_assert(stream->pos + used <= stream->buffer_size);
142 					memcpy(stream->w_buffer + stream->pos, data, used);
143 					stream->pos += used;
144 					got += used;
145 				}
146 			} else {
147 				used = wanted;
148 			}
149 			mstream->remain -= used;
150 			i_stream_skip(mstream->parent, used);
151 			/* see if there is more to read */
152 			continue;
153 		}
154 		if (mstream->remain == 0) {
155 			/* need more data */
156 			if (len < 5) {
157 				ret = i_stream_multiplex_ichannel_read(&req_channel->istream);
158 				if (ret > 0)
159 					got += ret;
160 				break;
161 			}
162 			/* channel ID */
163 			mstream->cur_channel = data[0];
164 			/* data length */
165 			mstream->remain = be32_to_cpu_unaligned(data+1);
166 			i_stream_skip(mstream->parent, 5);
167 		}
168 	}
169 
170 	propagate_error(mstream, mstream->parent->stream_errno);
171 	if (mstream->parent->eof)
172 		propagate_eof(mstream);
173 
174 	return got;
175 }
176 
i_stream_multiplex_ichannel_read(struct istream_private * stream)177 static ssize_t i_stream_multiplex_ichannel_read(struct istream_private *stream)
178 {
179 	struct multiplex_ichannel *channel =
180 		container_of(stream, struct multiplex_ichannel, istream);
181 	/* if previous multiplex read dumped data for us
182 	   actually serve it here. */
183 	if (channel->pending_pos > 0) {
184 		ssize_t ret = channel->pending_pos;
185 		stream->pos += channel->pending_pos;
186 		channel->pending_pos = 0;
187 		return ret;
188 	}
189 	return i_stream_multiplex_read(channel->mstream, channel);
190 }
191 
192 static void
i_stream_multiplex_ichannel_switch_ioloop_to(struct istream_private * stream,struct ioloop * ioloop)193 i_stream_multiplex_ichannel_switch_ioloop_to(struct istream_private *stream,
194 					     struct ioloop *ioloop)
195 {
196 	struct multiplex_ichannel *channel =
197 		container_of(stream, struct multiplex_ichannel, istream);
198 
199 	i_stream_switch_ioloop_to(channel->mstream->parent, ioloop);
200 }
201 
202 static void
i_stream_multiplex_ichannel_close(struct iostream_private * stream,bool close_parent)203 i_stream_multiplex_ichannel_close(struct iostream_private *stream, bool close_parent)
204 {
205 	struct multiplex_ichannel *arr_channel;
206 	struct multiplex_ichannel *channel =
207 		container_of(stream, struct multiplex_ichannel,
208 			     istream.iostream);
209 	channel->closed = TRUE;
210 	if (close_parent) {
211 		array_foreach_elem(&channel->mstream->channels, arr_channel)
212 			if (arr_channel != NULL && !arr_channel->closed)
213 				return;
214 		i_stream_close(channel->mstream->parent);
215 	}
216 }
217 
i_stream_multiplex_try_destroy(struct multiplex_istream * mstream)218 static void i_stream_multiplex_try_destroy(struct multiplex_istream *mstream)
219 {
220 	struct multiplex_ichannel *channel;
221 	/* can't do anything until they are all closed */
222 	array_foreach_elem(&mstream->channels, channel)
223 		if (channel != NULL)
224 			return;
225 	i_stream_unref(&mstream->parent);
226 	array_free(&mstream->channels);
227 	i_free(mstream);
228 }
229 
i_stream_multiplex_ichannel_destroy(struct iostream_private * stream)230 static void i_stream_multiplex_ichannel_destroy(struct iostream_private *stream)
231 {
232 	struct multiplex_ichannel **channelp;
233 	struct multiplex_ichannel *channel =
234 		container_of(stream, struct multiplex_ichannel,
235 			     istream.iostream);
236 	i_stream_multiplex_ichannel_close(stream, TRUE);
237 	i_stream_free_buffer(&channel->istream);
238 	array_foreach_modifiable(&channel->mstream->channels, channelp) {
239 		if (*channelp == channel) {
240 			*channelp = NULL;
241 			break;
242 		}
243 	}
244 	i_stream_multiplex_try_destroy(channel->mstream);
245 }
246 
247 static struct istream *
i_stream_add_channel_real(struct multiplex_istream * mstream,uint8_t cid)248 i_stream_add_channel_real(struct multiplex_istream *mstream, uint8_t cid)
249 {
250 	struct multiplex_ichannel *channel = i_new(struct multiplex_ichannel, 1);
251 	channel->cid = cid;
252 	channel->mstream = mstream;
253 	channel->istream.read = i_stream_multiplex_ichannel_read;
254 	channel->istream.switch_ioloop_to = i_stream_multiplex_ichannel_switch_ioloop_to;
255 	channel->istream.iostream.close = i_stream_multiplex_ichannel_close;
256 	channel->istream.iostream.destroy = i_stream_multiplex_ichannel_destroy;
257 	channel->istream.max_buffer_size = mstream->bufsize;
258 	channel->istream.istream.blocking = mstream->blocking;
259 	if (cid == 0)
260 		channel->istream.fd = i_stream_get_fd(mstream->parent);
261 	else
262 		channel->istream.fd = -1;
263 	array_push_back(&channel->mstream->channels, &channel);
264 
265 	return i_stream_create(&channel->istream, NULL, channel->istream.fd, 0);
266 }
267 
i_stream_multiplex_add_channel(struct istream * stream,uint8_t cid)268 struct istream *i_stream_multiplex_add_channel(struct istream *stream, uint8_t cid)
269 {
270 	struct multiplex_ichannel *chan =
271 		container_of(stream->real_stream,
272 			     struct multiplex_ichannel, istream);
273 	i_assert(get_channel(chan->mstream, cid) == NULL);
274 
275 	return i_stream_add_channel_real(chan->mstream, cid);
276 }
277 
i_stream_create_multiplex(struct istream * parent,size_t bufsize)278 struct istream *i_stream_create_multiplex(struct istream *parent, size_t bufsize)
279 {
280 	struct multiplex_istream *mstream;
281 
282 	mstream = i_new(struct multiplex_istream, 1);
283 	mstream->parent = parent;
284 	mstream->bufsize = bufsize;
285 	mstream->blocking = parent->blocking;
286 	i_array_init(&mstream->channels, 8);
287 	i_stream_ref(parent);
288 
289 	return i_stream_add_channel_real(mstream, 0);
290 }
291 
i_stream_multiplex_get_channel_id(struct istream * stream)292 uint8_t i_stream_multiplex_get_channel_id(struct istream *stream)
293 {
294 	struct multiplex_ichannel *channel =
295 		container_of(stream->real_stream,
296 			     struct multiplex_ichannel, istream);
297 	return channel->cid;
298 }
299