1 /* Copyright (c) 2017-2018 Dovecot authors, see the included COPYING file */
2 
3 #include "test-lib.h"
4 #include "ioloop.h"
5 #include "str.h"
6 #include "istream.h"
7 #include "ostream-private.h"
8 #include "istream-multiplex.h"
9 #include "ostream-multiplex.h"
10 #include "ostream.h"
11 #include <unistd.h>
12 
13 #include "hex-binary.h"
14 
test_ostream_multiplex_simple(void)15 static void test_ostream_multiplex_simple(void)
16 {
17 	test_begin("ostream multiplex (simple)");
18 
19 	const unsigned char expected[] = {
20 		'\x00','\x00','\x00','\x00','\x05','\x68','\x65',
21 		'\x6c','\x6c','\x6f','\x01','\x00','\x00','\x00',
22 		'\x05','\x77','\x6f','\x72','\x6c','\x64'
23 	};
24 
25 	buffer_t *result = t_str_new(64);
26 	struct ostream *os = test_ostream_create(result);
27 	struct ostream *os2 = o_stream_create_multiplex(os, SIZE_MAX);
28 	struct ostream *os3 = o_stream_multiplex_add_channel(os2, 1);
29 
30 	test_assert(o_stream_send_str(os2, "hello") == 5);
31 	test_assert(o_stream_send_str(os3, "world") == 5);
32 
33 	o_stream_unref(&os3);
34 	o_stream_unref(&os2);
35 
36 	test_assert(o_stream_finish(os) == 1);
37 	o_stream_unref(&os);
38 
39 	test_assert(sizeof(expected) == result->used);
40 	test_assert(memcmp(result->data, expected, I_MIN(sizeof(expected),
41 		    result->used)) == 0);
42 
43 	test_end();
44 }
45 
46 static unsigned int channel_counter[2] = {0, 0};
47 static struct ostream *chan0, *chan1;
48 
49 static const char *msgs[] = {
50 	"",
51 	"a",
52 	"bb",
53 	"ccc",
54 	"dddd",
55 	"eeeee",
56 	"ffffff"
57 };
58 
test_ostream_multiplex_stream_read(struct istream * is)59 static void test_ostream_multiplex_stream_read(struct istream *is)
60 {
61 	uint8_t cid;
62 	const unsigned char *data;
63 	size_t siz,dlen=0,pos=0;
64 
65 	if (i_stream_read_more(is, &data, &siz)>0) {
66 		/* parse stream */
67 		for(;pos<siz;) {
68 			if (dlen > 0) {
69 				if (dlen < N_ELEMENTS(msgs)) {
70 					test_assert_idx(memcmp(&data[pos],
71 							       msgs[dlen], dlen)==0,
72 							channel_counter[data[0] % 2]);
73 				}
74 				channel_counter[data[0] % 2]++;
75 				pos += dlen;
76 				dlen = 0;
77 			} else if (dlen == 0) {
78 				cid = data[pos] % 2;
79 				test_assert_idx(data[pos] < 2, channel_counter[cid]);
80 				pos++;
81 				dlen = be32_to_cpu_unaligned(&data[pos]);
82 				pos += 4;
83 				test_assert(dlen > 0 && dlen < N_ELEMENTS(msgs));
84 			}
85 		}
86 		i_stream_skip(is, siz);
87 	}
88 
89 	if (channel_counter[0] > 100 && channel_counter[1] > 100)
90 		io_loop_stop(current_ioloop);
91 }
92 
test_ostream_multiplex_stream_write(struct ostream * channel ATTR_UNUSED)93 static void test_ostream_multiplex_stream_write(struct ostream *channel ATTR_UNUSED)
94 {
95 	size_t rounds = 1 + i_rand_limit(10);
96 	for(size_t i = 0; i < rounds; i++) {
97 		if ((i_rand_limit(2)) != 0) {
98 			o_stream_cork(chan1);
99 			/* send one byte at a time */
100 			for(const char *p = msgs[i_rand_limit(N_ELEMENTS(msgs))];
101 			    *p != '\0'; p++) {
102 				o_stream_nsend(chan1, p, 1);
103 			}
104 			o_stream_uncork(chan1);
105 		} else {
106 			o_stream_nsend_str(chan0,
107 					   msgs[i_rand_limit(N_ELEMENTS(msgs))]);
108 		}
109 	}
110 }
111 
test_ostream_multiplex_stream(void)112 static void test_ostream_multiplex_stream(void)
113 {
114 	test_begin("ostream multiplex (stream)");
115 
116 	struct ioloop *ioloop = io_loop_create();
117 	io_loop_set_current(ioloop);
118 
119 	int fds[2];
120 	test_assert(pipe(fds) == 0);
121 	fd_set_nonblock(fds[0], TRUE);
122 	fd_set_nonblock(fds[1], TRUE);
123 	struct ostream *os = o_stream_create_fd(fds[1], SIZE_MAX);
124 	struct istream *is = i_stream_create_fd(fds[0], SIZE_MAX);
125 
126 	chan0 = o_stream_create_multiplex(os, SIZE_MAX);
127 	chan1 = o_stream_multiplex_add_channel(chan0, 1);
128 
129 	struct io *io0 =
130 		io_add_istream(is, test_ostream_multiplex_stream_read, is);
131 	struct io *io1 =
132 		io_add(fds[1], IO_WRITE, test_ostream_multiplex_stream_write, os);
133 
134 	io_loop_run(current_ioloop);
135 
136 	io_remove(&io0);
137 	io_remove(&io1);
138 
139 	test_assert(o_stream_finish(chan1) > 0);
140 	o_stream_unref(&chan1);
141 	test_assert(o_stream_finish(chan0) > 0);
142 	o_stream_unref(&chan0);
143 
144 	i_stream_unref(&is);
145 	o_stream_unref(&os);
146 
147 	io_loop_destroy(&ioloop);
148 
149 	i_close_fd(&fds[0]);
150 	i_close_fd(&fds[1]);
151 
152 	test_end();
153 }
154 
test_ostream_multiplex_cork(void)155 static void test_ostream_multiplex_cork(void)
156 {
157 	test_begin("ostream multiplex (corking)");
158 	buffer_t *output = t_buffer_create(128);
159 	struct ostream *os = test_ostream_create(output);
160 	struct ostream *chan0 = o_stream_create_multiplex(os, SIZE_MAX);
161 
162 	const struct const_iovec iov[] = {
163 		{ "hello", 5 },
164 		{ " ", 1 },
165 		{ "world", 5 },
166 		{ "!", 1 }
167 	};
168 
169 	/* send data in parts, expect to see single blob */
170 	o_stream_cork(chan0);
171 	o_stream_nsendv(chan0, iov, N_ELEMENTS(iov));
172 	o_stream_uncork(chan0);
173 	test_assert(o_stream_flush(os) == 1);
174 
175 	/* check output */
176 	test_assert(memcmp(output->data, "\0\0\0\0\f", 5) == 0);
177 	test_assert(strcmp(str_c(output)+5, "hello world!") == 0);
178 
179 	test_assert(o_stream_finish(chan0) > 0);
180 	o_stream_unref(&chan0);
181 	o_stream_unref(&os);
182 
183 	test_end();
184 }
185 
186 struct test_hang_context {
187 	struct istream *input1, *input2;
188 	size_t sent_bytes, sent2_bytes;
189 	size_t read_bytes, read2_bytes;
190 };
191 
test_hang_input(struct test_hang_context * ctx)192 static void test_hang_input(struct test_hang_context *ctx)
193 {
194 	ssize_t ret, ret2;
195 
196 	do {
197 		ret = i_stream_read(ctx->input1);
198 		if (ret > 0) {
199 			i_stream_skip(ctx->input1, ret);
200 			ctx->read_bytes += ret;
201 		}
202 		ret2 = i_stream_read(ctx->input2);
203 		if (ret2 > 0) {
204 			i_stream_skip(ctx->input2, ret2);
205 			ctx->read2_bytes += ret2;
206 		}
207 	} while (ret > 0 || ret2 > 0);
208 
209 	test_assert(ret == 0 && ret2 == 0);
210 	if (ctx->read_bytes == ctx->sent_bytes &&
211 	    ctx->read2_bytes == ctx->sent2_bytes)
212 		io_loop_stop(current_ioloop);
213 }
214 
test_ostream_multiplex_hang(void)215 static void test_ostream_multiplex_hang(void)
216 {
217 	int fd[2];
218 
219 	test_begin("ostream multiplex hang");
220 	if (pipe(fd) < 0)
221 		i_fatal("pipe() failed: %m");
222 	fd_set_nonblock(fd[0], TRUE);
223 	fd_set_nonblock(fd[1], TRUE);
224 
225 	struct ioloop *ioloop = io_loop_create();
226 	struct ostream *file_output = o_stream_create_fd(fd[1], 1024);
227 	o_stream_set_no_error_handling(file_output, TRUE);
228 	struct ostream *channel = o_stream_create_multiplex(file_output, 4096);
229 	struct ostream *channel2 = o_stream_multiplex_add_channel(channel, 1);
230 	char buf[256];
231 
232 	/* send multiplex output until the buffer is full */
233 	ssize_t ret, ret2;
234 	size_t sent_bytes = 0, sent2_bytes = 0;
235 	i_zero(&buf);
236 	o_stream_cork(channel);
237 	o_stream_cork(channel2);
238 	while ((ret = o_stream_send(channel, buf, sizeof(buf))) > 0) {
239 		sent_bytes += ret;
240 		ret2 = o_stream_send(channel2, buf, sizeof(buf));
241 		if (ret2 <= 0)
242 			break;
243 		sent2_bytes += ret2;
244 	}
245 	test_assert(o_stream_finish(channel) == 0);
246 	test_assert(o_stream_finish(channel2) == 0);
247 	o_stream_uncork(channel);
248 	o_stream_uncork(channel2);
249 	/* We expect the first channel to have data buffered */
250 	test_assert(o_stream_get_buffer_used_size(channel) >=
251 		    o_stream_get_buffer_used_size(file_output));
252 	test_assert(o_stream_get_buffer_used_size(channel) -
253 		    o_stream_get_buffer_used_size(file_output) > 0);
254 
255 	/* read everything that was already sent */
256 	struct istream *file_input = i_stream_create_fd(fd[0], 1024);
257 	struct istream *input = i_stream_create_multiplex(file_input, 4096);
258 	struct istream *input2 = i_stream_multiplex_add_channel(input, 1);
259 
260 	struct test_hang_context ctx = {
261 		.input1 = input,
262 		.input2 = input2,
263 		.sent_bytes = sent_bytes,
264 		.sent2_bytes = sent2_bytes,
265 	};
266 
267 	struct timeout *to = timeout_add(5000, io_loop_stop, current_ioloop);
268 	struct io *io = io_add_istream(file_input, test_hang_input, &ctx);
269 	io_loop_run(ioloop);
270 	io_remove(&io);
271 	timeout_remove(&to);
272 
273 	/* everything that was sent should have been received now.
274 	   ostream-multiplex's internal buffer is also supposed to have
275 	   been sent. */
276 	test_assert(input->v_offset == sent_bytes);
277 	test_assert(input2->v_offset == sent2_bytes);
278 	test_assert(o_stream_get_buffer_used_size(channel) == 0);
279 	test_assert(o_stream_get_buffer_used_size(channel2) == 0);
280 
281 	i_stream_unref(&file_input);
282 	i_stream_unref(&input);
283 	i_stream_unref(&input2);
284 	o_stream_unref(&channel);
285 	o_stream_unref(&channel2);
286 	o_stream_unref(&file_output);
287 	io_loop_destroy(&ioloop);
288 	test_end();
289 }
290 
291 #define FLUSH_CALLBACK_TOTAL_BYTES 10240
292 
293 struct test_flush_context {
294 	struct ostream *output1, *output2;
295 	struct istream *input1, *input2;
296 };
297 
flush_callback1(struct test_flush_context * ctx)298 static int flush_callback1(struct test_flush_context *ctx)
299 {
300 	char buf[32];
301 
302 	i_assert(ctx->output1->offset <= FLUSH_CALLBACK_TOTAL_BYTES);
303 	size_t bytes_left = FLUSH_CALLBACK_TOTAL_BYTES - ctx->output1->offset;
304 
305 	memset(buf, '1', sizeof(buf));
306 	if (o_stream_send(ctx->output1, buf, I_MIN(sizeof(buf), bytes_left)) < 0)
307 		return -1;
308 	return ctx->output1->offset < FLUSH_CALLBACK_TOTAL_BYTES ? 0 : 1;
309 }
310 
flush_callback2(struct test_flush_context * ctx)311 static int flush_callback2(struct test_flush_context *ctx)
312 {
313 	char buf[64];
314 
315 	i_assert(ctx->output2->offset <= FLUSH_CALLBACK_TOTAL_BYTES);
316 	size_t bytes_left = FLUSH_CALLBACK_TOTAL_BYTES - ctx->output2->offset;
317 
318 	memset(buf, '2', sizeof(buf));
319 	if (o_stream_send(ctx->output2, buf, I_MIN(sizeof(buf), bytes_left)) < 0)
320 		return -1;
321 	return ctx->output2->offset < FLUSH_CALLBACK_TOTAL_BYTES ? 0 : 1;
322 }
323 
test_flush_input(struct test_flush_context * ctx)324 static void test_flush_input(struct test_flush_context *ctx)
325 {
326 	ssize_t ret, ret2;
327 
328 	do {
329 		ret = i_stream_read(ctx->input1);
330 		if (ret > 0)
331 			i_stream_skip(ctx->input1, ret);
332 		ret2 = i_stream_read(ctx->input2);
333 		if (ret2 > 0)
334 			i_stream_skip(ctx->input2, ret2);
335 	} while (ret > 0 || ret2 > 0);
336 
337 	test_assert(ret == 0 && ret2 == 0);
338 	if (ctx->input1->v_offset == FLUSH_CALLBACK_TOTAL_BYTES &&
339 	    ctx->input2->v_offset == FLUSH_CALLBACK_TOTAL_BYTES)
340 		io_loop_stop(current_ioloop);
341 }
342 
test_ostream_multiplex_flush_callback(void)343 static void test_ostream_multiplex_flush_callback(void)
344 {
345 	int fd[2];
346 
347 	test_begin("ostream multiplex flush callback");
348 	if (pipe(fd) < 0)
349 		i_fatal("pipe() failed: %m");
350 	fd_set_nonblock(fd[0], TRUE);
351 	fd_set_nonblock(fd[1], TRUE);
352 
353 	struct ioloop *ioloop = io_loop_create();
354 	struct ostream *file_output = o_stream_create_fd(fd[1], 1024);
355 	o_stream_set_no_error_handling(file_output, TRUE);
356 	struct ostream *channel = o_stream_create_multiplex(file_output, 4096);
357 	struct ostream *channel2 = o_stream_multiplex_add_channel(channel, 1);
358 
359 	struct istream *file_input = i_stream_create_fd(fd[0], 1024);
360 	struct istream *input = i_stream_create_multiplex(file_input, 4096);
361 	struct istream *input2 = i_stream_multiplex_add_channel(input, 1);
362 
363 	struct test_flush_context ctx = {
364 		.output1 = channel,
365 		.output2 = channel2,
366 		.input1 = input,
367 		.input2 = input2,
368 	};
369 	o_stream_set_flush_callback(channel, flush_callback1, &ctx);
370 	o_stream_set_flush_callback(channel2, flush_callback2, &ctx);
371 	o_stream_set_flush_pending(channel, TRUE);
372 	o_stream_set_flush_pending(channel2, TRUE);
373 
374 	struct timeout *to = timeout_add(5000, io_loop_stop, current_ioloop);
375 	struct io *io = io_add_istream(file_input, test_flush_input, &ctx);
376 	io_loop_run(ioloop);
377 	io_remove(&io);
378 	timeout_remove(&to);
379 
380 	test_assert(channel->offset == FLUSH_CALLBACK_TOTAL_BYTES);
381 	test_assert(channel2->offset == FLUSH_CALLBACK_TOTAL_BYTES);
382 	test_assert(input->v_offset == FLUSH_CALLBACK_TOTAL_BYTES);
383 	test_assert(input2->v_offset == FLUSH_CALLBACK_TOTAL_BYTES);
384 
385 	test_assert(o_stream_finish(channel) == 1);
386 	test_assert(o_stream_finish(channel2) == 1);
387 
388 	i_stream_unref(&file_input);
389 	i_stream_unref(&input);
390 	i_stream_unref(&input2);
391 	o_stream_unref(&channel);
392 	o_stream_unref(&channel2);
393 	o_stream_unref(&file_output);
394 	io_loop_destroy(&ioloop);
395 	test_end();
396 }
397 
test_ostream_multiplex(void)398 void test_ostream_multiplex(void)
399 {
400 	test_ostream_multiplex_simple();
401 	test_ostream_multiplex_stream();
402 	test_ostream_multiplex_cork();
403 	test_ostream_multiplex_hang();
404 	test_ostream_multiplex_flush_callback();
405 }
406