1 /* Copyright (c) 2017-2018 Dovecot authors, see the included COPYING file */
2
3 #include "test-lib.h"
4 #include "ioloop.h"
5 #include "str.h"
6 #include "istream.h"
7 #include "ostream-private.h"
8 #include "istream-multiplex.h"
9 #include "ostream-multiplex.h"
10 #include "ostream.h"
11 #include <unistd.h>
12
13 #include "hex-binary.h"
14
test_ostream_multiplex_simple(void)15 static void test_ostream_multiplex_simple(void)
16 {
17 test_begin("ostream multiplex (simple)");
18
19 const unsigned char expected[] = {
20 '\x00','\x00','\x00','\x00','\x05','\x68','\x65',
21 '\x6c','\x6c','\x6f','\x01','\x00','\x00','\x00',
22 '\x05','\x77','\x6f','\x72','\x6c','\x64'
23 };
24
25 buffer_t *result = t_str_new(64);
26 struct ostream *os = test_ostream_create(result);
27 struct ostream *os2 = o_stream_create_multiplex(os, SIZE_MAX);
28 struct ostream *os3 = o_stream_multiplex_add_channel(os2, 1);
29
30 test_assert(o_stream_send_str(os2, "hello") == 5);
31 test_assert(o_stream_send_str(os3, "world") == 5);
32
33 o_stream_unref(&os3);
34 o_stream_unref(&os2);
35
36 test_assert(o_stream_finish(os) == 1);
37 o_stream_unref(&os);
38
39 test_assert(sizeof(expected) == result->used);
40 test_assert(memcmp(result->data, expected, I_MIN(sizeof(expected),
41 result->used)) == 0);
42
43 test_end();
44 }
45
46 static unsigned int channel_counter[2] = {0, 0};
47 static struct ostream *chan0, *chan1;
48
49 static const char *msgs[] = {
50 "",
51 "a",
52 "bb",
53 "ccc",
54 "dddd",
55 "eeeee",
56 "ffffff"
57 };
58
test_ostream_multiplex_stream_read(struct istream * is)59 static void test_ostream_multiplex_stream_read(struct istream *is)
60 {
61 uint8_t cid;
62 const unsigned char *data;
63 size_t siz,dlen=0,pos=0;
64
65 if (i_stream_read_more(is, &data, &siz)>0) {
66 /* parse stream */
67 for(;pos<siz;) {
68 if (dlen > 0) {
69 if (dlen < N_ELEMENTS(msgs)) {
70 test_assert_idx(memcmp(&data[pos],
71 msgs[dlen], dlen)==0,
72 channel_counter[data[0] % 2]);
73 }
74 channel_counter[data[0] % 2]++;
75 pos += dlen;
76 dlen = 0;
77 } else if (dlen == 0) {
78 cid = data[pos] % 2;
79 test_assert_idx(data[pos] < 2, channel_counter[cid]);
80 pos++;
81 dlen = be32_to_cpu_unaligned(&data[pos]);
82 pos += 4;
83 test_assert(dlen > 0 && dlen < N_ELEMENTS(msgs));
84 }
85 }
86 i_stream_skip(is, siz);
87 }
88
89 if (channel_counter[0] > 100 && channel_counter[1] > 100)
90 io_loop_stop(current_ioloop);
91 }
92
test_ostream_multiplex_stream_write(struct ostream * channel ATTR_UNUSED)93 static void test_ostream_multiplex_stream_write(struct ostream *channel ATTR_UNUSED)
94 {
95 size_t rounds = 1 + i_rand_limit(10);
96 for(size_t i = 0; i < rounds; i++) {
97 if ((i_rand_limit(2)) != 0) {
98 o_stream_cork(chan1);
99 /* send one byte at a time */
100 for(const char *p = msgs[i_rand_limit(N_ELEMENTS(msgs))];
101 *p != '\0'; p++) {
102 o_stream_nsend(chan1, p, 1);
103 }
104 o_stream_uncork(chan1);
105 } else {
106 o_stream_nsend_str(chan0,
107 msgs[i_rand_limit(N_ELEMENTS(msgs))]);
108 }
109 }
110 }
111
test_ostream_multiplex_stream(void)112 static void test_ostream_multiplex_stream(void)
113 {
114 test_begin("ostream multiplex (stream)");
115
116 struct ioloop *ioloop = io_loop_create();
117 io_loop_set_current(ioloop);
118
119 int fds[2];
120 test_assert(pipe(fds) == 0);
121 fd_set_nonblock(fds[0], TRUE);
122 fd_set_nonblock(fds[1], TRUE);
123 struct ostream *os = o_stream_create_fd(fds[1], SIZE_MAX);
124 struct istream *is = i_stream_create_fd(fds[0], SIZE_MAX);
125
126 chan0 = o_stream_create_multiplex(os, SIZE_MAX);
127 chan1 = o_stream_multiplex_add_channel(chan0, 1);
128
129 struct io *io0 =
130 io_add_istream(is, test_ostream_multiplex_stream_read, is);
131 struct io *io1 =
132 io_add(fds[1], IO_WRITE, test_ostream_multiplex_stream_write, os);
133
134 io_loop_run(current_ioloop);
135
136 io_remove(&io0);
137 io_remove(&io1);
138
139 test_assert(o_stream_finish(chan1) > 0);
140 o_stream_unref(&chan1);
141 test_assert(o_stream_finish(chan0) > 0);
142 o_stream_unref(&chan0);
143
144 i_stream_unref(&is);
145 o_stream_unref(&os);
146
147 io_loop_destroy(&ioloop);
148
149 i_close_fd(&fds[0]);
150 i_close_fd(&fds[1]);
151
152 test_end();
153 }
154
test_ostream_multiplex_cork(void)155 static void test_ostream_multiplex_cork(void)
156 {
157 test_begin("ostream multiplex (corking)");
158 buffer_t *output = t_buffer_create(128);
159 struct ostream *os = test_ostream_create(output);
160 struct ostream *chan0 = o_stream_create_multiplex(os, SIZE_MAX);
161
162 const struct const_iovec iov[] = {
163 { "hello", 5 },
164 { " ", 1 },
165 { "world", 5 },
166 { "!", 1 }
167 };
168
169 /* send data in parts, expect to see single blob */
170 o_stream_cork(chan0);
171 o_stream_nsendv(chan0, iov, N_ELEMENTS(iov));
172 o_stream_uncork(chan0);
173 test_assert(o_stream_flush(os) == 1);
174
175 /* check output */
176 test_assert(memcmp(output->data, "\0\0\0\0\f", 5) == 0);
177 test_assert(strcmp(str_c(output)+5, "hello world!") == 0);
178
179 test_assert(o_stream_finish(chan0) > 0);
180 o_stream_unref(&chan0);
181 o_stream_unref(&os);
182
183 test_end();
184 }
185
186 struct test_hang_context {
187 struct istream *input1, *input2;
188 size_t sent_bytes, sent2_bytes;
189 size_t read_bytes, read2_bytes;
190 };
191
test_hang_input(struct test_hang_context * ctx)192 static void test_hang_input(struct test_hang_context *ctx)
193 {
194 ssize_t ret, ret2;
195
196 do {
197 ret = i_stream_read(ctx->input1);
198 if (ret > 0) {
199 i_stream_skip(ctx->input1, ret);
200 ctx->read_bytes += ret;
201 }
202 ret2 = i_stream_read(ctx->input2);
203 if (ret2 > 0) {
204 i_stream_skip(ctx->input2, ret2);
205 ctx->read2_bytes += ret2;
206 }
207 } while (ret > 0 || ret2 > 0);
208
209 test_assert(ret == 0 && ret2 == 0);
210 if (ctx->read_bytes == ctx->sent_bytes &&
211 ctx->read2_bytes == ctx->sent2_bytes)
212 io_loop_stop(current_ioloop);
213 }
214
test_ostream_multiplex_hang(void)215 static void test_ostream_multiplex_hang(void)
216 {
217 int fd[2];
218
219 test_begin("ostream multiplex hang");
220 if (pipe(fd) < 0)
221 i_fatal("pipe() failed: %m");
222 fd_set_nonblock(fd[0], TRUE);
223 fd_set_nonblock(fd[1], TRUE);
224
225 struct ioloop *ioloop = io_loop_create();
226 struct ostream *file_output = o_stream_create_fd(fd[1], 1024);
227 o_stream_set_no_error_handling(file_output, TRUE);
228 struct ostream *channel = o_stream_create_multiplex(file_output, 4096);
229 struct ostream *channel2 = o_stream_multiplex_add_channel(channel, 1);
230 char buf[256];
231
232 /* send multiplex output until the buffer is full */
233 ssize_t ret, ret2;
234 size_t sent_bytes = 0, sent2_bytes = 0;
235 i_zero(&buf);
236 o_stream_cork(channel);
237 o_stream_cork(channel2);
238 while ((ret = o_stream_send(channel, buf, sizeof(buf))) > 0) {
239 sent_bytes += ret;
240 ret2 = o_stream_send(channel2, buf, sizeof(buf));
241 if (ret2 <= 0)
242 break;
243 sent2_bytes += ret2;
244 }
245 test_assert(o_stream_finish(channel) == 0);
246 test_assert(o_stream_finish(channel2) == 0);
247 o_stream_uncork(channel);
248 o_stream_uncork(channel2);
249 /* We expect the first channel to have data buffered */
250 test_assert(o_stream_get_buffer_used_size(channel) >=
251 o_stream_get_buffer_used_size(file_output));
252 test_assert(o_stream_get_buffer_used_size(channel) -
253 o_stream_get_buffer_used_size(file_output) > 0);
254
255 /* read everything that was already sent */
256 struct istream *file_input = i_stream_create_fd(fd[0], 1024);
257 struct istream *input = i_stream_create_multiplex(file_input, 4096);
258 struct istream *input2 = i_stream_multiplex_add_channel(input, 1);
259
260 struct test_hang_context ctx = {
261 .input1 = input,
262 .input2 = input2,
263 .sent_bytes = sent_bytes,
264 .sent2_bytes = sent2_bytes,
265 };
266
267 struct timeout *to = timeout_add(5000, io_loop_stop, current_ioloop);
268 struct io *io = io_add_istream(file_input, test_hang_input, &ctx);
269 io_loop_run(ioloop);
270 io_remove(&io);
271 timeout_remove(&to);
272
273 /* everything that was sent should have been received now.
274 ostream-multiplex's internal buffer is also supposed to have
275 been sent. */
276 test_assert(input->v_offset == sent_bytes);
277 test_assert(input2->v_offset == sent2_bytes);
278 test_assert(o_stream_get_buffer_used_size(channel) == 0);
279 test_assert(o_stream_get_buffer_used_size(channel2) == 0);
280
281 i_stream_unref(&file_input);
282 i_stream_unref(&input);
283 i_stream_unref(&input2);
284 o_stream_unref(&channel);
285 o_stream_unref(&channel2);
286 o_stream_unref(&file_output);
287 io_loop_destroy(&ioloop);
288 test_end();
289 }
290
291 #define FLUSH_CALLBACK_TOTAL_BYTES 10240
292
293 struct test_flush_context {
294 struct ostream *output1, *output2;
295 struct istream *input1, *input2;
296 };
297
flush_callback1(struct test_flush_context * ctx)298 static int flush_callback1(struct test_flush_context *ctx)
299 {
300 char buf[32];
301
302 i_assert(ctx->output1->offset <= FLUSH_CALLBACK_TOTAL_BYTES);
303 size_t bytes_left = FLUSH_CALLBACK_TOTAL_BYTES - ctx->output1->offset;
304
305 memset(buf, '1', sizeof(buf));
306 if (o_stream_send(ctx->output1, buf, I_MIN(sizeof(buf), bytes_left)) < 0)
307 return -1;
308 return ctx->output1->offset < FLUSH_CALLBACK_TOTAL_BYTES ? 0 : 1;
309 }
310
flush_callback2(struct test_flush_context * ctx)311 static int flush_callback2(struct test_flush_context *ctx)
312 {
313 char buf[64];
314
315 i_assert(ctx->output2->offset <= FLUSH_CALLBACK_TOTAL_BYTES);
316 size_t bytes_left = FLUSH_CALLBACK_TOTAL_BYTES - ctx->output2->offset;
317
318 memset(buf, '2', sizeof(buf));
319 if (o_stream_send(ctx->output2, buf, I_MIN(sizeof(buf), bytes_left)) < 0)
320 return -1;
321 return ctx->output2->offset < FLUSH_CALLBACK_TOTAL_BYTES ? 0 : 1;
322 }
323
test_flush_input(struct test_flush_context * ctx)324 static void test_flush_input(struct test_flush_context *ctx)
325 {
326 ssize_t ret, ret2;
327
328 do {
329 ret = i_stream_read(ctx->input1);
330 if (ret > 0)
331 i_stream_skip(ctx->input1, ret);
332 ret2 = i_stream_read(ctx->input2);
333 if (ret2 > 0)
334 i_stream_skip(ctx->input2, ret2);
335 } while (ret > 0 || ret2 > 0);
336
337 test_assert(ret == 0 && ret2 == 0);
338 if (ctx->input1->v_offset == FLUSH_CALLBACK_TOTAL_BYTES &&
339 ctx->input2->v_offset == FLUSH_CALLBACK_TOTAL_BYTES)
340 io_loop_stop(current_ioloop);
341 }
342
test_ostream_multiplex_flush_callback(void)343 static void test_ostream_multiplex_flush_callback(void)
344 {
345 int fd[2];
346
347 test_begin("ostream multiplex flush callback");
348 if (pipe(fd) < 0)
349 i_fatal("pipe() failed: %m");
350 fd_set_nonblock(fd[0], TRUE);
351 fd_set_nonblock(fd[1], TRUE);
352
353 struct ioloop *ioloop = io_loop_create();
354 struct ostream *file_output = o_stream_create_fd(fd[1], 1024);
355 o_stream_set_no_error_handling(file_output, TRUE);
356 struct ostream *channel = o_stream_create_multiplex(file_output, 4096);
357 struct ostream *channel2 = o_stream_multiplex_add_channel(channel, 1);
358
359 struct istream *file_input = i_stream_create_fd(fd[0], 1024);
360 struct istream *input = i_stream_create_multiplex(file_input, 4096);
361 struct istream *input2 = i_stream_multiplex_add_channel(input, 1);
362
363 struct test_flush_context ctx = {
364 .output1 = channel,
365 .output2 = channel2,
366 .input1 = input,
367 .input2 = input2,
368 };
369 o_stream_set_flush_callback(channel, flush_callback1, &ctx);
370 o_stream_set_flush_callback(channel2, flush_callback2, &ctx);
371 o_stream_set_flush_pending(channel, TRUE);
372 o_stream_set_flush_pending(channel2, TRUE);
373
374 struct timeout *to = timeout_add(5000, io_loop_stop, current_ioloop);
375 struct io *io = io_add_istream(file_input, test_flush_input, &ctx);
376 io_loop_run(ioloop);
377 io_remove(&io);
378 timeout_remove(&to);
379
380 test_assert(channel->offset == FLUSH_CALLBACK_TOTAL_BYTES);
381 test_assert(channel2->offset == FLUSH_CALLBACK_TOTAL_BYTES);
382 test_assert(input->v_offset == FLUSH_CALLBACK_TOTAL_BYTES);
383 test_assert(input2->v_offset == FLUSH_CALLBACK_TOTAL_BYTES);
384
385 test_assert(o_stream_finish(channel) == 1);
386 test_assert(o_stream_finish(channel2) == 1);
387
388 i_stream_unref(&file_input);
389 i_stream_unref(&input);
390 i_stream_unref(&input2);
391 o_stream_unref(&channel);
392 o_stream_unref(&channel2);
393 o_stream_unref(&file_output);
394 io_loop_destroy(&ioloop);
395 test_end();
396 }
397
test_ostream_multiplex(void)398 void test_ostream_multiplex(void)
399 {
400 test_ostream_multiplex_simple();
401 test_ostream_multiplex_stream();
402 test_ostream_multiplex_cork();
403 test_ostream_multiplex_hang();
404 test_ostream_multiplex_flush_callback();
405 }
406