1 /**
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0.
4 */
5 #include <aws/common/clock.h>
6 #include <aws/common/condition_variable.h>
7 #include <aws/common/mutex.h>
8 #include <aws/common/task_scheduler.h>
9 #include <aws/io/event_loop.h>
10 #include <aws/io/pipe.h>
11 #include <aws/testing/aws_test_harness.h>
12
13 enum pipe_loop_setup {
14 SAME_EVENT_LOOP,
15 DIFFERENT_EVENT_LOOPS,
16 };
17
18 enum {
19 SMALL_BUFFER_SIZE = 4,
20 GIANT_BUFFER_SIZE = 1024 * 1024 * 32, /* 32MB */
21 };
22
23 /* Used for tracking state in the pipe tests. */
24 struct pipe_state {
25 /* Begin setup parameters */
26 enum pipe_loop_setup loop_setup;
27 size_t buffer_size;
28 /* End setup parameters */
29
30 struct aws_allocator *alloc;
31
32 struct aws_pipe_read_end read_end;
33 struct aws_pipe_write_end write_end;
34
35 struct aws_event_loop *read_loop;
36 struct aws_event_loop *write_loop;
37
38 /* Since most pipe operations must be performed on the event-loop thread,
39 * the `results` struct is used to signal the main thread that the tests are finished. */
40 struct {
41 struct aws_mutex mutex;
42 struct aws_condition_variable condvar;
43 bool read_end_closed;
44 bool write_end_closed;
45 int status_code; /* Set to non-zero if something goes wrong on the thread. */
46 } results;
47
48 struct {
49 struct aws_byte_buf src;
50 struct aws_byte_buf dst;
51 size_t num_bytes_written;
52 } buffers;
53
54 struct {
55 int error_code_to_monitor; /* By default, monitors AWS_ERROR_SUCCESS aka normal readable events */
56 int count; /* count of events that we're monitoring */
57 int close_read_end_after_n_events; /* if set, close read-end when count reaches N */
58 } readable_events;
59
60 void *test_data; /* If a test needs special data */
61 };
62
s_fixture_before(struct aws_allocator * allocator,void * ctx)63 static int s_fixture_before(struct aws_allocator *allocator, void *ctx) {
64 struct pipe_state *state = ctx;
65 state->alloc = allocator;
66
67 state->read_loop = aws_event_loop_new_default(allocator, aws_high_res_clock_get_ticks);
68 ASSERT_NOT_NULL(state->read_loop);
69 ASSERT_SUCCESS(aws_event_loop_run(state->read_loop));
70
71 if (state->loop_setup == DIFFERENT_EVENT_LOOPS) {
72 state->write_loop = aws_event_loop_new_default(allocator, aws_high_res_clock_get_ticks);
73 ASSERT_NOT_NULL(state->write_loop);
74
75 ASSERT_SUCCESS(aws_event_loop_run(state->write_loop));
76 } else {
77 state->write_loop = state->read_loop;
78 }
79
80 ASSERT_SUCCESS(aws_pipe_init(&state->read_end, state->read_loop, &state->write_end, state->write_loop, allocator));
81
82 ASSERT_SUCCESS(aws_mutex_init(&state->results.mutex));
83
84 ASSERT_SUCCESS(aws_condition_variable_init(&state->results.condvar));
85
86 if (state->buffer_size > 0) {
87 /* Create full src buffer, containing random content */
88 ASSERT_SUCCESS(aws_byte_buf_init(&state->buffers.src, allocator, state->buffer_size));
89
90 state->buffers.src.len = state->buffer_size;
91 for (size_t i = 0; i < state->buffer_size; ++i) {
92 state->buffers.src.buffer[i] = (uint8_t)(rand() % 256);
93 }
94
95 /* Create empty dst buffer, with zeroed out content */
96 ASSERT_SUCCESS(aws_byte_buf_init(&state->buffers.dst, allocator, state->buffer_size));
97
98 memset(state->buffers.dst.buffer, 0, state->buffers.dst.capacity);
99 }
100
101 return AWS_OP_SUCCESS;
102 }
103
104 /* Assumes the pipe's read-end and write-end are already cleaned up */
s_fixture_after(struct aws_allocator * allocator,int setup_res,void * ctx)105 static int s_fixture_after(struct aws_allocator *allocator, int setup_res, void *ctx) {
106 (void)allocator;
107 (void)setup_res;
108
109 struct pipe_state *state = ctx;
110
111 aws_condition_variable_clean_up(&state->results.condvar);
112 aws_mutex_clean_up(&state->results.mutex);
113
114 if (state->read_loop) {
115 aws_event_loop_destroy(state->read_loop);
116 }
117
118 if (state->write_loop != state->read_loop) {
119 aws_event_loop_destroy(state->write_loop);
120 }
121
122 aws_byte_buf_clean_up(&state->buffers.src);
123 aws_byte_buf_clean_up(&state->buffers.dst);
124
125 AWS_ZERO_STRUCT(*state);
126
127 return AWS_OP_SUCCESS;
128 }
129
130 /* Macro for declaring pipe tests.
131 * Add pipe tests to CMakeLists.txt like so: add_pipe_test_case(NAME)
132 *
133 * Each pipe test is run in 2 different configurations:
134 * 1) both ends of the pipe use the same event-loop
135 * 2) each end of the pipe is on its own event-loop
136 *
137 * For each test with NAME, write a function with the following signature:
138 * int test_NAME(struct pipe_state *state) {...}
139 */
140 #define PIPE_TEST_CASE(NAME, BUFFER_SIZE) \
141 static struct pipe_state NAME##_pipe_state_same_loop = { \
142 .loop_setup = SAME_EVENT_LOOP, \
143 .buffer_size = (BUFFER_SIZE), \
144 }; \
145 static int test_##NAME##_same_loop(struct aws_allocator *allocator, void *ctx) { \
146 (void)allocator; \
147 struct pipe_state *state = ctx; \
148 return test_##NAME(state); \
149 } \
150 AWS_TEST_CASE_FIXTURE( \
151 NAME, s_fixture_before, test_##NAME##_same_loop, s_fixture_after, &NAME##_pipe_state_same_loop) \
152 \
153 static struct pipe_state NAME##_pipe_state_different_loops = { \
154 .loop_setup = DIFFERENT_EVENT_LOOPS, \
155 .buffer_size = (BUFFER_SIZE), \
156 }; \
157 static int test_##NAME##_different_loops(struct aws_allocator *allocator, void *ctx) { \
158 (void)allocator; \
159 struct pipe_state *state = ctx; \
160 return test_##NAME(state); \
161 } \
162 AWS_TEST_CASE_FIXTURE( \
163 NAME##_2loops, \
164 s_fixture_before, \
165 test_##NAME##_different_loops, \
166 s_fixture_after, \
167 &NAME##_pipe_state_different_loops)
168
169 /* Checking if work on thread is done */
s_done_pred(void * user_data)170 static bool s_done_pred(void *user_data) {
171 struct pipe_state *state = user_data;
172
173 if (state->results.status_code != 0) {
174 return true;
175 }
176
177 if (state->results.read_end_closed && state->results.write_end_closed) {
178 return true;
179 }
180
181 return false;
182 }
183
184 /* Signal that work is done, due to an unexpected error */
s_signal_error(struct pipe_state * state)185 static void s_signal_error(struct pipe_state *state) {
186 aws_mutex_lock(&state->results.mutex);
187 state->results.status_code = -1;
188 aws_condition_variable_notify_all(&state->results.condvar);
189 aws_mutex_unlock(&state->results.mutex);
190 }
191
s_signal_done_on_read_end_closed(struct pipe_state * state)192 static void s_signal_done_on_read_end_closed(struct pipe_state *state) {
193 /* Signal that work might be done */
194 aws_mutex_lock(&state->results.mutex);
195 state->results.read_end_closed = true;
196 aws_condition_variable_notify_all(&state->results.condvar);
197 aws_mutex_unlock(&state->results.mutex);
198 }
199
s_signal_done_on_write_end_closed(struct pipe_state * state)200 static void s_signal_done_on_write_end_closed(struct pipe_state *state) {
201 /* Signal that work might be done */
202 aws_mutex_lock(&state->results.mutex);
203 state->results.write_end_closed = true;
204 aws_condition_variable_notify_all(&state->results.condvar);
205 aws_mutex_unlock(&state->results.mutex);
206 }
207
s_pipe_state_check_copied_data(struct pipe_state * state)208 static int s_pipe_state_check_copied_data(struct pipe_state *state) {
209 ASSERT_UINT_EQUALS(state->buffer_size, state->buffers.num_bytes_written);
210 ASSERT_TRUE(aws_byte_buf_eq(&state->buffers.src, &state->buffers.dst));
211 return AWS_OP_SUCCESS;
212 }
213
214 /* Use as "simplified" task functions in pipe_state tasks.
215 * The boilerplate of task scheduling and error-checking are handled by wrapper functions */
216 typedef void(pipe_state_task_fn)(struct pipe_state *state);
217
218 struct pipe_state_task_wrapper {
219 struct aws_task task;
220 struct pipe_state *state;
221 pipe_state_task_fn *wrapped_fn;
222 };
223
s_pipe_state_task_wrapper_fn(struct aws_task * task,void * arg,enum aws_task_status status)224 static void s_pipe_state_task_wrapper_fn(struct aws_task *task, void *arg, enum aws_task_status status) {
225 (void)task;
226 struct pipe_state_task_wrapper *wrapper = arg;
227 struct pipe_state *state = wrapper->state;
228 pipe_state_task_fn *wrapped_fn = wrapper->wrapped_fn;
229
230 aws_mem_release(state->alloc, wrapper);
231
232 if (status == AWS_TASK_STATUS_RUN_READY) {
233 wrapped_fn(state);
234 } else {
235 s_signal_error(state);
236 }
237 }
238
239 /* Schedules a pipe_state_task_fn */
s_schedule_task(struct pipe_state * state,struct aws_event_loop * loop,pipe_state_task_fn * fn,int delay_secs)240 static void s_schedule_task(
241 struct pipe_state *state,
242 struct aws_event_loop *loop,
243 pipe_state_task_fn *fn,
244 int delay_secs) {
245
246 struct pipe_state_task_wrapper *wrapper = aws_mem_acquire(state->alloc, sizeof(struct pipe_state_task_wrapper));
247 if (!wrapper) {
248 goto error;
249 }
250
251 aws_task_init(&wrapper->task, s_pipe_state_task_wrapper_fn, wrapper, "pipe_state");
252 wrapper->wrapped_fn = fn;
253 wrapper->state = state;
254
255 if (delay_secs == 0) {
256 aws_event_loop_schedule_task_now(loop, &wrapper->task);
257 } else {
258 uint64_t run_at_ns;
259 int err = aws_event_loop_current_clock_time(loop, &run_at_ns);
260 if (err) {
261 goto error;
262 }
263 run_at_ns += aws_timestamp_convert((uint64_t)delay_secs, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL);
264
265 aws_event_loop_schedule_task_future(loop, &wrapper->task, run_at_ns);
266 }
267
268 return;
269 error:
270 s_signal_error(state);
271 }
272
s_schedule_read_end_task(struct pipe_state * state,pipe_state_task_fn * fn)273 static void s_schedule_read_end_task(struct pipe_state *state, pipe_state_task_fn *fn) {
274 s_schedule_task(state, state->read_loop, fn, 0);
275 }
276
s_schedule_write_end_task(struct pipe_state * state,pipe_state_task_fn * fn)277 static void s_schedule_write_end_task(struct pipe_state *state, pipe_state_task_fn *fn) {
278 s_schedule_task(state, state->write_loop, fn, 0);
279 }
280
281 /* wait for pipe_state to indicate that it's done */
s_wait_for_results(struct pipe_state * state)282 static int s_wait_for_results(struct pipe_state *state) {
283 ASSERT_SUCCESS(aws_mutex_lock(&state->results.mutex));
284 ASSERT_SUCCESS(
285 aws_condition_variable_wait_pred(&state->results.condvar, &state->results.mutex, s_done_pred, state));
286 ASSERT_SUCCESS(aws_mutex_unlock(&state->results.mutex));
287
288 return state->results.status_code;
289 }
290
s_clean_up_read_end_task(struct pipe_state * state)291 static void s_clean_up_read_end_task(struct pipe_state *state) {
292 int err = aws_pipe_clean_up_read_end(&state->read_end);
293 if (err) {
294 goto error;
295 }
296
297 s_signal_done_on_read_end_closed(state);
298 return;
299
300 error:
301 s_signal_error(state);
302 }
303
s_clean_up_write_end_task(struct pipe_state * state)304 static void s_clean_up_write_end_task(struct pipe_state *state) {
305 int err = aws_pipe_clean_up_write_end(&state->write_end);
306 if (err) {
307 goto error;
308 }
309
310 s_signal_done_on_write_end_closed(state);
311
312 return;
313
314 error:
315 s_signal_error(state);
316 }
317
318 /* Just test the pipe being opened and closed */
test_pipe_open_close(struct pipe_state * state)319 static int test_pipe_open_close(struct pipe_state *state) {
320 s_schedule_read_end_task(state, s_clean_up_read_end_task);
321 s_schedule_write_end_task(state, s_clean_up_write_end_task);
322
323 ASSERT_SUCCESS(s_wait_for_results(state));
324
325 return AWS_OP_SUCCESS;
326 }
327
328 PIPE_TEST_CASE(pipe_open_close, SMALL_BUFFER_SIZE);
329
s_clean_up_write_end_on_write_completed(struct aws_pipe_write_end * write_end,int error_code,struct aws_byte_cursor src_buffer,void * user_data)330 void s_clean_up_write_end_on_write_completed(
331 struct aws_pipe_write_end *write_end,
332 int error_code,
333 struct aws_byte_cursor src_buffer,
334 void *user_data) {
335
336 struct pipe_state *state = user_data;
337
338 if (!error_code) {
339 state->buffers.num_bytes_written += src_buffer.len;
340 }
341
342 int err = aws_pipe_clean_up_write_end(write_end);
343 if (err) {
344 goto error;
345 }
346
347 s_signal_done_on_write_end_closed(state);
348
349 return;
350 error:
351 s_signal_error(state);
352 }
353
354 /* Write everything in the buffer, clean up write-end when write completes*/
s_write_once_task(struct pipe_state * state)355 static void s_write_once_task(struct pipe_state *state) {
356 struct aws_byte_cursor cursor = aws_byte_cursor_from_buf(&state->buffers.src);
357 int err = aws_pipe_write(&state->write_end, cursor, s_clean_up_write_end_on_write_completed, state);
358 if (err) {
359 goto error;
360 }
361
362 return;
363 error:
364 s_signal_error(state);
365 }
366
367 /* Task tries to read as much data as possible.
368 * Task repeatedly reschedules itself until read-buffer is full, then it cleans up the read-end */
s_read_everything_task(struct pipe_state * state)369 static void s_read_everything_task(struct pipe_state *state) {
370 int err = aws_pipe_read(&state->read_end, &state->buffers.dst, NULL);
371
372 /* AWS_IO_READ_WOULD_BLOCK is an acceptable error, it just means the data's not ready yet */
373 if (err && (aws_last_error() != AWS_IO_READ_WOULD_BLOCK)) {
374 goto error;
375 }
376
377 if (state->buffers.dst.len < state->buffers.dst.capacity) {
378 s_schedule_read_end_task(state, s_read_everything_task);
379 } else {
380 err = aws_pipe_clean_up_read_end(&state->read_end);
381 if (err) {
382 goto error;
383 }
384 s_signal_done_on_read_end_closed(state);
385 }
386
387 return;
388 error:
389 s_signal_error(state);
390 }
391
392 /* common function used by small-buffer test and large-buffer test */
s_test_pipe_read_write(struct pipe_state * state)393 static int s_test_pipe_read_write(struct pipe_state *state) {
394 s_schedule_read_end_task(state, s_read_everything_task);
395 s_schedule_write_end_task(state, s_write_once_task);
396
397 ASSERT_SUCCESS(s_wait_for_results(state));
398
399 ASSERT_SUCCESS(s_pipe_state_check_copied_data(state));
400
401 return AWS_OP_SUCCESS;
402 }
403
404 /* Test that a small buffer can be sent through the pipe */
test_pipe_read_write(struct pipe_state * state)405 static int test_pipe_read_write(struct pipe_state *state) {
406 return s_test_pipe_read_write(state);
407 }
408
409 PIPE_TEST_CASE(pipe_read_write, SMALL_BUFFER_SIZE);
410
411 /* Test that a large buffer can be sent through the pipe. */
test_pipe_read_write_large_buffer(struct pipe_state * state)412 static int test_pipe_read_write_large_buffer(struct pipe_state *state) {
413 return s_test_pipe_read_write(state);
414 }
415
416 PIPE_TEST_CASE(pipe_read_write_large_buffer, GIANT_BUFFER_SIZE);
417
s_on_readable_event(struct aws_pipe_read_end * read_end,int error_code,void * user_data)418 static void s_on_readable_event(struct aws_pipe_read_end *read_end, int error_code, void *user_data) {
419
420 struct pipe_state *state = user_data;
421
422 if (error_code == state->readable_events.error_code_to_monitor) {
423 state->readable_events.count++;
424
425 if (state->readable_events.count == state->readable_events.close_read_end_after_n_events) {
426 int err = aws_pipe_clean_up_read_end(read_end);
427 if (err) {
428 goto error;
429 }
430 s_signal_done_on_read_end_closed(state);
431 }
432 }
433
434 return;
435 error:
436 s_signal_error(state);
437 }
438
s_subscribe_task(struct pipe_state * state)439 static void s_subscribe_task(struct pipe_state *state) {
440 int err = aws_pipe_subscribe_to_readable_events(&state->read_end, s_on_readable_event, state);
441 if (err) {
442 goto error;
443 }
444
445 return;
446 error:
447 s_signal_error(state);
448 }
449
test_pipe_readable_event_sent_after_write(struct pipe_state * state)450 static int test_pipe_readable_event_sent_after_write(struct pipe_state *state) {
451 state->readable_events.error_code_to_monitor = AWS_ERROR_SUCCESS;
452 state->readable_events.close_read_end_after_n_events = 1;
453
454 s_schedule_read_end_task(state, s_subscribe_task);
455 s_schedule_write_end_task(state, s_write_once_task);
456
457 ASSERT_SUCCESS(s_wait_for_results(state));
458
459 ASSERT_INT_EQUALS(1, state->readable_events.count);
460
461 return AWS_OP_SUCCESS;
462 }
463
464 PIPE_TEST_CASE(pipe_readable_event_sent_after_write, SMALL_BUFFER_SIZE);
465
s_sentonce_on_readable_event(struct aws_pipe_read_end * read_end,int events,void * user_data)466 static void s_sentonce_on_readable_event(struct aws_pipe_read_end *read_end, int events, void *user_data) {
467 struct pipe_state *state = user_data;
468
469 int prev_events_count = state->readable_events.count;
470
471 /* invoke usual readable callback so the events are logged */
472 s_on_readable_event(read_end, events, user_data);
473 if (state->results.status_code) { /* bail out if anything went wrong */
474 return;
475 }
476
477 /* when the 1st readable event comes in, schedule task to close read-end after waiting a bit.
478 * this lets us observe any further events that might come in */
479 if ((state->readable_events.count == 1) && (prev_events_count == 0)) {
480 s_schedule_task(state, state->read_loop, s_clean_up_read_end_task, 1 /*delay*/);
481 }
482 }
483
s_sentonce_subscribe_task(struct pipe_state * state)484 static void s_sentonce_subscribe_task(struct pipe_state *state) {
485 int err = aws_pipe_subscribe_to_readable_events(&state->read_end, s_sentonce_on_readable_event, state);
486 if (err) {
487 goto error;
488 }
489
490 return;
491 error:
492 s_signal_error(state);
493 }
494
495 /* Check that readable event is only sent once after a write.
496 * Short name for test is: sentonce */
test_pipe_readable_event_sent_once(struct pipe_state * state)497 static int test_pipe_readable_event_sent_once(struct pipe_state *state) {
498 state->readable_events.error_code_to_monitor = AWS_ERROR_SUCCESS;
499
500 s_schedule_read_end_task(state, s_sentonce_subscribe_task);
501 s_schedule_write_end_task(state, s_write_once_task);
502
503 ASSERT_SUCCESS(s_wait_for_results(state));
504
505 /* Accept 1 or 2 events. Epoll notifies about "readable" when sending "write end closed" event.
506 * That's fine, we just don't want dozens of readable events to have come in. */
507 ASSERT_TRUE(state->readable_events.count <= 2);
508
509 return AWS_OP_SUCCESS;
510 }
511 PIPE_TEST_CASE(pipe_readable_event_sent_once, SMALL_BUFFER_SIZE);
512
s_subscribe_on_write_completed(struct aws_pipe_write_end * write_end,int error_code,struct aws_byte_cursor src_buffer,void * user_data)513 void s_subscribe_on_write_completed(
514 struct aws_pipe_write_end *write_end,
515 int error_code,
516 struct aws_byte_cursor src_buffer,
517 void *user_data) {
518
519 struct pipe_state *state = user_data;
520
521 if (!error_code) {
522 state->buffers.num_bytes_written += src_buffer.len;
523 }
524
525 int err = aws_pipe_clean_up_write_end(write_end);
526 if (err) {
527 goto error;
528 }
529 s_signal_done_on_write_end_closed(state);
530
531 /* Tell read end to subscribe */
532 s_schedule_read_end_task(state, s_subscribe_task);
533
534 return;
535 error:
536 s_signal_error(state);
537 }
538
539 /* Write all data. When write completes, write-end cleans up and tells the read-end to subscribe */
s_write_once_then_subscribe_task(struct pipe_state * state)540 static void s_write_once_then_subscribe_task(struct pipe_state *state) {
541 struct aws_byte_cursor cursor = aws_byte_cursor_from_buf(&state->buffers.src);
542 int err = aws_pipe_write(&state->write_end, cursor, s_subscribe_on_write_completed, state);
543 if (err) {
544 goto error;
545 }
546
547 return;
548 error:
549 s_signal_error(state);
550 }
551
test_pipe_readable_event_sent_on_subscribe_if_data_present(struct pipe_state * state)552 static int test_pipe_readable_event_sent_on_subscribe_if_data_present(struct pipe_state *state) {
553 state->readable_events.error_code_to_monitor = AWS_ERROR_SUCCESS;
554 state->readable_events.close_read_end_after_n_events = 1;
555
556 s_schedule_write_end_task(state, s_write_once_then_subscribe_task);
557
558 ASSERT_SUCCESS(s_wait_for_results(state));
559
560 ASSERT_INT_EQUALS(1, state->readable_events.count);
561
562 return AWS_OP_SUCCESS;
563 }
564
565 PIPE_TEST_CASE(pipe_readable_event_sent_on_subscribe_if_data_present, SMALL_BUFFER_SIZE);
566
s_resubscribe_on_readable_event(struct aws_pipe_read_end * read_end,int events,void * user_data)567 static void s_resubscribe_on_readable_event(struct aws_pipe_read_end *read_end, int events, void *user_data) {
568 struct pipe_state *state = user_data;
569 int err = 0;
570
571 int prev_events_count = state->readable_events.count;
572
573 /* invoke usual readable callback so the events are logged */
574 s_on_readable_event(read_end, events, user_data);
575 if (state->results.status_code) { /* bail out if anything went wrong */
576 return;
577 }
578
579 if ((state->readable_events.count == 1) && (prev_events_count == 0)) {
580 /* unsubscribe and resubscribe */
581 err = aws_pipe_unsubscribe_from_readable_events(&state->read_end);
582 if (err) {
583 goto error;
584 }
585
586 err = aws_pipe_subscribe_to_readable_events(&state->read_end, s_on_readable_event, state);
587 if (err) {
588 goto error;
589 }
590 }
591
592 return;
593 error:
594 s_signal_error(state);
595 }
596
s_resubscribe_1_task(struct pipe_state * state)597 static void s_resubscribe_1_task(struct pipe_state *state) {
598 int err = aws_pipe_subscribe_to_readable_events(&state->read_end, s_resubscribe_on_readable_event, state);
599 if (err) {
600 goto error;
601 }
602
603 return;
604 error:
605 s_signal_error(state);
606 }
607
s_resubscribe_write_task(struct pipe_state * state)608 static void s_resubscribe_write_task(struct pipe_state *state) {
609 struct aws_byte_cursor cursor = aws_byte_cursor_from_buf(&state->buffers.src);
610 int err = aws_pipe_write(&state->write_end, cursor, s_clean_up_write_end_on_write_completed, state);
611 if (err) {
612 goto error;
613 }
614
615 /* schedule task for read-end to perform 1st subscribe */
616 s_schedule_read_end_task(state, s_resubscribe_1_task);
617
618 return;
619 error:
620 s_signal_error(state);
621 }
622
test_pipe_readable_event_sent_on_resubscribe_if_data_present(struct pipe_state * state)623 static int test_pipe_readable_event_sent_on_resubscribe_if_data_present(struct pipe_state *state) {
624 state->readable_events.error_code_to_monitor = AWS_ERROR_SUCCESS;
625 state->readable_events.close_read_end_after_n_events = 2;
626
627 s_schedule_write_end_task(state, s_resubscribe_write_task);
628
629 ASSERT_SUCCESS(s_wait_for_results(state));
630
631 ASSERT_INT_EQUALS(2, state->readable_events.count);
632
633 return AWS_OP_SUCCESS;
634 }
635
636 PIPE_TEST_CASE(pipe_readable_event_sent_on_resubscribe_if_data_present, SMALL_BUFFER_SIZE);
637
s_readall_on_write_completed(struct aws_pipe_write_end * write_end,int error_code,struct aws_byte_cursor src_buffer,void * user_data)638 static void s_readall_on_write_completed(
639 struct aws_pipe_write_end *write_end,
640 int error_code,
641 struct aws_byte_cursor src_buffer,
642 void *user_data) {
643
644 struct pipe_state *state = user_data;
645 int err = 0;
646
647 if (error_code) {
648 goto error;
649 }
650
651 bool is_2nd_write = (state->buffers.num_bytes_written > 0);
652
653 state->buffers.num_bytes_written += src_buffer.len;
654
655 /* Clean up after 2nd write */
656 if (is_2nd_write) {
657 err = aws_pipe_clean_up_write_end(write_end);
658 if (err) {
659 goto error;
660 }
661 s_signal_done_on_write_end_closed(state);
662 }
663
664 return;
665 error:
666 s_signal_error(state);
667 }
668
s_readall_write_task(struct pipe_state * state)669 static void s_readall_write_task(struct pipe_state *state) {
670 struct aws_byte_cursor cursor = aws_byte_cursor_from_buf(&state->buffers.src);
671 int err = aws_pipe_write(&state->write_end, cursor, s_readall_on_write_completed, state);
672 if (err) {
673 goto error;
674 }
675
676 return;
677 error:
678 s_signal_error(state);
679 }
680
s_readall_on_readable(struct aws_pipe_read_end * read_end,int events,void * user_data)681 static void s_readall_on_readable(struct aws_pipe_read_end *read_end, int events, void *user_data) {
682 struct pipe_state *state = user_data;
683 int err = 0;
684
685 int prev_event_count = state->readable_events.count;
686
687 /* invoke usual readable callback so the events are logged */
688 s_on_readable_event(read_end, events, user_data);
689 if (state->results.status_code) { /* bail out if anything went wrong */
690 return;
691 }
692
693 if ((state->readable_events.count == 1) && (prev_event_count == 0)) {
694 size_t total_bytes_read = 0;
695
696 /* After the first write, read data until we're told that further reads would block.
697 * This ensures that the next write is sure to trigger a readable event */
698 while (true) {
699 state->buffers.dst.len = 0;
700 err = aws_pipe_read(read_end, &state->buffers.dst, NULL);
701 total_bytes_read += state->buffers.dst.len;
702
703 if (err) {
704 if (aws_last_error() == AWS_IO_READ_WOULD_BLOCK) {
705 break;
706 }
707 goto error;
708 }
709 }
710
711 /* Sanity check that we did in fact read something */
712 if (total_bytes_read == 0) {
713 goto error;
714 }
715
716 /* Schedule the 2nd write */
717 s_schedule_write_end_task(state, s_readall_write_task);
718 }
719
720 return;
721 error:
722 s_signal_error(state);
723 }
724
s_readall_subscribe_task(struct pipe_state * state)725 static void s_readall_subscribe_task(struct pipe_state *state) {
726 int err = aws_pipe_subscribe_to_readable_events(&state->read_end, s_readall_on_readable, state);
727 if (err) {
728 goto error;
729 }
730
731 return;
732 error:
733 s_signal_error(state);
734 }
735
736 /* Check that the 2nd readable event is sent again in the case of: subscribe, write 1, read all, write 2
737 * Short name for test is: readall */
test_pipe_readable_event_sent_again_after_all_data_read(struct pipe_state * state)738 static int test_pipe_readable_event_sent_again_after_all_data_read(struct pipe_state *state) {
739 state->readable_events.error_code_to_monitor = AWS_ERROR_SUCCESS;
740 state->readable_events.close_read_end_after_n_events = 2;
741
742 s_schedule_read_end_task(state, s_readall_subscribe_task);
743 s_schedule_write_end_task(state, s_readall_write_task);
744
745 ASSERT_SUCCESS(s_wait_for_results(state));
746
747 ASSERT_INT_EQUALS(2, state->readable_events.count);
748
749 return AWS_OP_SUCCESS;
750 }
751
752 PIPE_TEST_CASE(pipe_readable_event_sent_again_after_all_data_read, SMALL_BUFFER_SIZE);
753
s_subscribe_and_schedule_write_end_clean_up_task(struct pipe_state * state)754 static void s_subscribe_and_schedule_write_end_clean_up_task(struct pipe_state *state) {
755 int err = aws_pipe_subscribe_to_readable_events(&state->read_end, s_on_readable_event, state);
756 if (err) {
757 goto error;
758 }
759
760 /* schedule write end to clean up */
761 s_schedule_write_end_task(state, s_clean_up_write_end_task);
762
763 return;
764 error:
765 s_signal_error(state);
766 }
767
test_pipe_error_event_sent_after_write_end_closed(struct pipe_state * state)768 static int test_pipe_error_event_sent_after_write_end_closed(struct pipe_state *state) {
769 state->readable_events.error_code_to_monitor = AWS_IO_BROKEN_PIPE;
770 state->readable_events.close_read_end_after_n_events = 1;
771
772 s_schedule_read_end_task(state, s_subscribe_and_schedule_write_end_clean_up_task);
773
774 ASSERT_SUCCESS(s_wait_for_results(state));
775
776 ASSERT_INT_EQUALS(1, state->readable_events.count);
777
778 return AWS_OP_SUCCESS;
779 }
780
781 PIPE_TEST_CASE(pipe_error_event_sent_after_write_end_closed, SMALL_BUFFER_SIZE);
782
s_clean_up_write_end_then_schedule_subscribe_task(struct pipe_state * state)783 static void s_clean_up_write_end_then_schedule_subscribe_task(struct pipe_state *state) {
784 int err = aws_pipe_clean_up_write_end(&state->write_end);
785 if (err) {
786 goto error;
787 }
788 s_signal_done_on_write_end_closed(state);
789
790 s_schedule_read_end_task(state, s_subscribe_task);
791
792 return;
793 error:
794 s_signal_error(state);
795 }
796
test_pipe_error_event_sent_on_subscribe_if_write_end_already_closed(struct pipe_state * state)797 static int test_pipe_error_event_sent_on_subscribe_if_write_end_already_closed(struct pipe_state *state) {
798 state->readable_events.error_code_to_monitor = AWS_IO_BROKEN_PIPE;
799 state->readable_events.close_read_end_after_n_events = 1;
800
801 s_schedule_write_end_task(state, s_clean_up_write_end_then_schedule_subscribe_task);
802
803 ASSERT_SUCCESS(s_wait_for_results(state));
804
805 ASSERT_INT_EQUALS(1, state->readable_events.count);
806
807 return AWS_OP_SUCCESS;
808 }
809
810 PIPE_TEST_CASE(pipe_error_event_sent_on_subscribe_if_write_end_already_closed, SMALL_BUFFER_SIZE);
811
s_close_write_end_after_all_writes_completed(struct aws_pipe_write_end * write_end,int error_code,struct aws_byte_cursor src_buffer,void * user_data)812 static void s_close_write_end_after_all_writes_completed(
813 struct aws_pipe_write_end *write_end,
814 int error_code,
815 struct aws_byte_cursor src_buffer,
816 void *user_data) {
817
818 struct pipe_state *state = user_data;
819
820 if (error_code) {
821 goto error;
822 }
823
824 state->buffers.num_bytes_written += src_buffer.len;
825
826 if (state->buffers.num_bytes_written == state->buffer_size) {
827 int err = aws_pipe_clean_up_write_end(write_end);
828 if (err) {
829 goto error;
830 }
831 s_signal_done_on_write_end_closed(state);
832 }
833
834 return;
835 error:
836 s_signal_error(state);
837 }
838
s_write_in_simultaneous_chunks_task(struct pipe_state * state)839 static void s_write_in_simultaneous_chunks_task(struct pipe_state *state) {
840 /* Write the whole buffer via several successive writes */
841 struct aws_byte_cursor cursor = aws_byte_cursor_from_buf(&state->buffers.src);
842 const size_t chunk_size = cursor.len / 8;
843 while (cursor.len > 0) {
844 size_t bytes_to_write = (chunk_size < cursor.len) ? chunk_size : cursor.len;
845 struct aws_byte_cursor chunk_cursor = aws_byte_cursor_from_array(cursor.ptr, bytes_to_write);
846
847 int err = aws_pipe_write(&state->write_end, chunk_cursor, s_close_write_end_after_all_writes_completed, state);
848 if (err) {
849 goto error;
850 }
851
852 aws_byte_cursor_advance(&cursor, bytes_to_write);
853 }
854
855 return;
856 error:
857 s_signal_error(state);
858 }
859
test_pipe_writes_are_fifo(struct pipe_state * state)860 static int test_pipe_writes_are_fifo(struct pipe_state *state) {
861
862 s_schedule_read_end_task(state, s_read_everything_task);
863 s_schedule_write_end_task(state, s_write_in_simultaneous_chunks_task);
864
865 ASSERT_SUCCESS(s_wait_for_results(state));
866
867 ASSERT_SUCCESS(s_pipe_state_check_copied_data(state));
868
869 return AWS_OP_SUCCESS;
870 }
871
872 PIPE_TEST_CASE(pipe_writes_are_fifo, GIANT_BUFFER_SIZE);
873
s_cancelled_on_write_completed(struct aws_pipe_write_end * write_end,int error_code,struct aws_byte_cursor src_buffer,void * user_data)874 static void s_cancelled_on_write_completed(
875 struct aws_pipe_write_end *write_end,
876 int error_code,
877 struct aws_byte_cursor src_buffer,
878 void *user_data) {
879
880 (void)write_end;
881 struct pipe_state *state = user_data;
882
883 int *write_status_code = state->test_data;
884 *write_status_code = error_code;
885
886 if (!error_code) {
887 state->buffers.num_bytes_written += src_buffer.len;
888 }
889
890 s_schedule_read_end_task(state, s_clean_up_read_end_task);
891 }
892
s_write_then_clean_up_task(struct pipe_state * state)893 static void s_write_then_clean_up_task(struct pipe_state *state) {
894 struct aws_byte_cursor cursor = aws_byte_cursor_from_buf(&state->buffers.src);
895 int err = aws_pipe_write(&state->write_end, cursor, s_cancelled_on_write_completed, state);
896 if (err) {
897 goto error;
898 }
899
900 err = aws_pipe_clean_up_write_end(&state->write_end);
901 if (err) {
902 goto error;
903 }
904 s_signal_done_on_write_end_closed(state);
905
906 return;
907 error:
908 s_signal_error(state);
909 }
910
911 /* Perform an enormous write that can't possibly complete without a bit of reading.
912 * After kicking off the write operation, close the write-end.
913 * The write operation chould complete with an error status */
test_pipe_clean_up_cancels_pending_writes(struct pipe_state * state)914 static int test_pipe_clean_up_cancels_pending_writes(struct pipe_state *state) {
915 /* capture the status code from the on-write-complete callback */
916 int write_status_code = 0;
917 state->test_data = &write_status_code;
918
919 s_schedule_write_end_task(state, s_write_then_clean_up_task);
920
921 ASSERT_SUCCESS(s_wait_for_results(state));
922
923 ASSERT_INT_EQUALS(AWS_IO_BROKEN_PIPE, write_status_code);
924 ASSERT_TRUE(state->buffers.num_bytes_written < state->buffer_size);
925
926 return AWS_OP_SUCCESS;
927 }
928
929 PIPE_TEST_CASE(pipe_clean_up_cancels_pending_writes, GIANT_BUFFER_SIZE);
930