1 /**
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  * SPDX-License-Identifier: Apache-2.0.
4  */
5 #include <aws/common/clock.h>
6 #include <aws/common/condition_variable.h>
7 #include <aws/common/mutex.h>
8 #include <aws/common/task_scheduler.h>
9 #include <aws/io/event_loop.h>
10 #include <aws/io/pipe.h>
11 #include <aws/testing/aws_test_harness.h>
12 
13 enum pipe_loop_setup {
14     SAME_EVENT_LOOP,
15     DIFFERENT_EVENT_LOOPS,
16 };
17 
18 enum {
19     SMALL_BUFFER_SIZE = 4,
20     GIANT_BUFFER_SIZE = 1024 * 1024 * 32, /* 32MB */
21 };
22 
23 /* Used for tracking state in the pipe tests. */
24 struct pipe_state {
25     /* Begin setup parameters */
26     enum pipe_loop_setup loop_setup;
27     size_t buffer_size;
28     /* End setup parameters */
29 
30     struct aws_allocator *alloc;
31 
32     struct aws_pipe_read_end read_end;
33     struct aws_pipe_write_end write_end;
34 
35     struct aws_event_loop *read_loop;
36     struct aws_event_loop *write_loop;
37 
38     /* Since most pipe operations must be performed on the event-loop thread,
39      * the `results` struct is used to signal the main thread that the tests are finished. */
40     struct {
41         struct aws_mutex mutex;
42         struct aws_condition_variable condvar;
43         bool read_end_closed;
44         bool write_end_closed;
45         int status_code; /* Set to non-zero if something goes wrong on the thread. */
46     } results;
47 
48     struct {
49         struct aws_byte_buf src;
50         struct aws_byte_buf dst;
51         size_t num_bytes_written;
52     } buffers;
53 
54     struct {
55         int error_code_to_monitor;         /* By default, monitors AWS_ERROR_SUCCESS aka normal readable events */
56         int count;                         /* count of events that we're monitoring */
57         int close_read_end_after_n_events; /* if set, close read-end when count reaches N */
58     } readable_events;
59 
60     void *test_data; /* If a test needs special data */
61 };
62 
s_fixture_before(struct aws_allocator * allocator,void * ctx)63 static int s_fixture_before(struct aws_allocator *allocator, void *ctx) {
64     struct pipe_state *state = ctx;
65     state->alloc = allocator;
66 
67     state->read_loop = aws_event_loop_new_default(allocator, aws_high_res_clock_get_ticks);
68     ASSERT_NOT_NULL(state->read_loop);
69     ASSERT_SUCCESS(aws_event_loop_run(state->read_loop));
70 
71     if (state->loop_setup == DIFFERENT_EVENT_LOOPS) {
72         state->write_loop = aws_event_loop_new_default(allocator, aws_high_res_clock_get_ticks);
73         ASSERT_NOT_NULL(state->write_loop);
74 
75         ASSERT_SUCCESS(aws_event_loop_run(state->write_loop));
76     } else {
77         state->write_loop = state->read_loop;
78     }
79 
80     ASSERT_SUCCESS(aws_pipe_init(&state->read_end, state->read_loop, &state->write_end, state->write_loop, allocator));
81 
82     ASSERT_SUCCESS(aws_mutex_init(&state->results.mutex));
83 
84     ASSERT_SUCCESS(aws_condition_variable_init(&state->results.condvar));
85 
86     if (state->buffer_size > 0) {
87         /* Create full src buffer, containing random content */
88         ASSERT_SUCCESS(aws_byte_buf_init(&state->buffers.src, allocator, state->buffer_size));
89 
90         state->buffers.src.len = state->buffer_size;
91         for (size_t i = 0; i < state->buffer_size; ++i) {
92             state->buffers.src.buffer[i] = (uint8_t)(rand() % 256);
93         }
94 
95         /* Create empty dst buffer, with zeroed out content */
96         ASSERT_SUCCESS(aws_byte_buf_init(&state->buffers.dst, allocator, state->buffer_size));
97 
98         memset(state->buffers.dst.buffer, 0, state->buffers.dst.capacity);
99     }
100 
101     return AWS_OP_SUCCESS;
102 }
103 
104 /* Assumes the pipe's read-end and write-end are already cleaned up */
s_fixture_after(struct aws_allocator * allocator,int setup_res,void * ctx)105 static int s_fixture_after(struct aws_allocator *allocator, int setup_res, void *ctx) {
106     (void)allocator;
107     (void)setup_res;
108 
109     struct pipe_state *state = ctx;
110 
111     aws_condition_variable_clean_up(&state->results.condvar);
112     aws_mutex_clean_up(&state->results.mutex);
113 
114     if (state->read_loop) {
115         aws_event_loop_destroy(state->read_loop);
116     }
117 
118     if (state->write_loop != state->read_loop) {
119         aws_event_loop_destroy(state->write_loop);
120     }
121 
122     aws_byte_buf_clean_up(&state->buffers.src);
123     aws_byte_buf_clean_up(&state->buffers.dst);
124 
125     AWS_ZERO_STRUCT(*state);
126 
127     return AWS_OP_SUCCESS;
128 }
129 
130 /* Macro for declaring pipe tests.
131  * Add pipe tests to CMakeLists.txt like so: add_pipe_test_case(NAME)
132  *
133  * Each pipe test is run in 2 different configurations:
134  * 1) both ends of the pipe use the same event-loop
135  * 2) each end of the pipe is on its own event-loop
136  *
137  * For each test with NAME, write a function with the following signature:
138  * int test_NAME(struct pipe_state *state) {...}
139  */
140 #define PIPE_TEST_CASE(NAME, BUFFER_SIZE)                                                                              \
141     static struct pipe_state NAME##_pipe_state_same_loop = {                                                           \
142         .loop_setup = SAME_EVENT_LOOP,                                                                                 \
143         .buffer_size = (BUFFER_SIZE),                                                                                  \
144     };                                                                                                                 \
145     static int test_##NAME##_same_loop(struct aws_allocator *allocator, void *ctx) {                                   \
146         (void)allocator;                                                                                               \
147         struct pipe_state *state = ctx;                                                                                \
148         return test_##NAME(state);                                                                                     \
149     }                                                                                                                  \
150     AWS_TEST_CASE_FIXTURE(                                                                                             \
151         NAME, s_fixture_before, test_##NAME##_same_loop, s_fixture_after, &NAME##_pipe_state_same_loop)                \
152                                                                                                                        \
153     static struct pipe_state NAME##_pipe_state_different_loops = {                                                     \
154         .loop_setup = DIFFERENT_EVENT_LOOPS,                                                                           \
155         .buffer_size = (BUFFER_SIZE),                                                                                  \
156     };                                                                                                                 \
157     static int test_##NAME##_different_loops(struct aws_allocator *allocator, void *ctx) {                             \
158         (void)allocator;                                                                                               \
159         struct pipe_state *state = ctx;                                                                                \
160         return test_##NAME(state);                                                                                     \
161     }                                                                                                                  \
162     AWS_TEST_CASE_FIXTURE(                                                                                             \
163         NAME##_2loops,                                                                                                 \
164         s_fixture_before,                                                                                              \
165         test_##NAME##_different_loops,                                                                                 \
166         s_fixture_after,                                                                                               \
167         &NAME##_pipe_state_different_loops)
168 
169 /* Checking if work on thread is done */
s_done_pred(void * user_data)170 static bool s_done_pred(void *user_data) {
171     struct pipe_state *state = user_data;
172 
173     if (state->results.status_code != 0) {
174         return true;
175     }
176 
177     if (state->results.read_end_closed && state->results.write_end_closed) {
178         return true;
179     }
180 
181     return false;
182 }
183 
184 /* Signal that work is done, due to an unexpected error */
s_signal_error(struct pipe_state * state)185 static void s_signal_error(struct pipe_state *state) {
186     aws_mutex_lock(&state->results.mutex);
187     state->results.status_code = -1;
188     aws_condition_variable_notify_all(&state->results.condvar);
189     aws_mutex_unlock(&state->results.mutex);
190 }
191 
s_signal_done_on_read_end_closed(struct pipe_state * state)192 static void s_signal_done_on_read_end_closed(struct pipe_state *state) {
193     /* Signal that work might be done */
194     aws_mutex_lock(&state->results.mutex);
195     state->results.read_end_closed = true;
196     aws_condition_variable_notify_all(&state->results.condvar);
197     aws_mutex_unlock(&state->results.mutex);
198 }
199 
s_signal_done_on_write_end_closed(struct pipe_state * state)200 static void s_signal_done_on_write_end_closed(struct pipe_state *state) {
201     /* Signal that work might be done */
202     aws_mutex_lock(&state->results.mutex);
203     state->results.write_end_closed = true;
204     aws_condition_variable_notify_all(&state->results.condvar);
205     aws_mutex_unlock(&state->results.mutex);
206 }
207 
s_pipe_state_check_copied_data(struct pipe_state * state)208 static int s_pipe_state_check_copied_data(struct pipe_state *state) {
209     ASSERT_UINT_EQUALS(state->buffer_size, state->buffers.num_bytes_written);
210     ASSERT_TRUE(aws_byte_buf_eq(&state->buffers.src, &state->buffers.dst));
211     return AWS_OP_SUCCESS;
212 }
213 
214 /* Use as "simplified" task functions in pipe_state tasks.
215  * The boilerplate of task scheduling and error-checking are handled by wrapper functions */
216 typedef void(pipe_state_task_fn)(struct pipe_state *state);
217 
218 struct pipe_state_task_wrapper {
219     struct aws_task task;
220     struct pipe_state *state;
221     pipe_state_task_fn *wrapped_fn;
222 };
223 
s_pipe_state_task_wrapper_fn(struct aws_task * task,void * arg,enum aws_task_status status)224 static void s_pipe_state_task_wrapper_fn(struct aws_task *task, void *arg, enum aws_task_status status) {
225     (void)task;
226     struct pipe_state_task_wrapper *wrapper = arg;
227     struct pipe_state *state = wrapper->state;
228     pipe_state_task_fn *wrapped_fn = wrapper->wrapped_fn;
229 
230     aws_mem_release(state->alloc, wrapper);
231 
232     if (status == AWS_TASK_STATUS_RUN_READY) {
233         wrapped_fn(state);
234     } else {
235         s_signal_error(state);
236     }
237 }
238 
239 /* Schedules a pipe_state_task_fn */
s_schedule_task(struct pipe_state * state,struct aws_event_loop * loop,pipe_state_task_fn * fn,int delay_secs)240 static void s_schedule_task(
241     struct pipe_state *state,
242     struct aws_event_loop *loop,
243     pipe_state_task_fn *fn,
244     int delay_secs) {
245 
246     struct pipe_state_task_wrapper *wrapper = aws_mem_acquire(state->alloc, sizeof(struct pipe_state_task_wrapper));
247     if (!wrapper) {
248         goto error;
249     }
250 
251     aws_task_init(&wrapper->task, s_pipe_state_task_wrapper_fn, wrapper, "pipe_state");
252     wrapper->wrapped_fn = fn;
253     wrapper->state = state;
254 
255     if (delay_secs == 0) {
256         aws_event_loop_schedule_task_now(loop, &wrapper->task);
257     } else {
258         uint64_t run_at_ns;
259         int err = aws_event_loop_current_clock_time(loop, &run_at_ns);
260         if (err) {
261             goto error;
262         }
263         run_at_ns += aws_timestamp_convert((uint64_t)delay_secs, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL);
264 
265         aws_event_loop_schedule_task_future(loop, &wrapper->task, run_at_ns);
266     }
267 
268     return;
269 error:
270     s_signal_error(state);
271 }
272 
s_schedule_read_end_task(struct pipe_state * state,pipe_state_task_fn * fn)273 static void s_schedule_read_end_task(struct pipe_state *state, pipe_state_task_fn *fn) {
274     s_schedule_task(state, state->read_loop, fn, 0);
275 }
276 
s_schedule_write_end_task(struct pipe_state * state,pipe_state_task_fn * fn)277 static void s_schedule_write_end_task(struct pipe_state *state, pipe_state_task_fn *fn) {
278     s_schedule_task(state, state->write_loop, fn, 0);
279 }
280 
281 /* wait for pipe_state to indicate that it's done */
s_wait_for_results(struct pipe_state * state)282 static int s_wait_for_results(struct pipe_state *state) {
283     ASSERT_SUCCESS(aws_mutex_lock(&state->results.mutex));
284     ASSERT_SUCCESS(
285         aws_condition_variable_wait_pred(&state->results.condvar, &state->results.mutex, s_done_pred, state));
286     ASSERT_SUCCESS(aws_mutex_unlock(&state->results.mutex));
287 
288     return state->results.status_code;
289 }
290 
s_clean_up_read_end_task(struct pipe_state * state)291 static void s_clean_up_read_end_task(struct pipe_state *state) {
292     int err = aws_pipe_clean_up_read_end(&state->read_end);
293     if (err) {
294         goto error;
295     }
296 
297     s_signal_done_on_read_end_closed(state);
298     return;
299 
300 error:
301     s_signal_error(state);
302 }
303 
s_clean_up_write_end_task(struct pipe_state * state)304 static void s_clean_up_write_end_task(struct pipe_state *state) {
305     int err = aws_pipe_clean_up_write_end(&state->write_end);
306     if (err) {
307         goto error;
308     }
309 
310     s_signal_done_on_write_end_closed(state);
311 
312     return;
313 
314 error:
315     s_signal_error(state);
316 }
317 
318 /* Just test the pipe being opened and closed */
test_pipe_open_close(struct pipe_state * state)319 static int test_pipe_open_close(struct pipe_state *state) {
320     s_schedule_read_end_task(state, s_clean_up_read_end_task);
321     s_schedule_write_end_task(state, s_clean_up_write_end_task);
322 
323     ASSERT_SUCCESS(s_wait_for_results(state));
324 
325     return AWS_OP_SUCCESS;
326 }
327 
328 PIPE_TEST_CASE(pipe_open_close, SMALL_BUFFER_SIZE);
329 
s_clean_up_write_end_on_write_completed(struct aws_pipe_write_end * write_end,int error_code,struct aws_byte_cursor src_buffer,void * user_data)330 void s_clean_up_write_end_on_write_completed(
331     struct aws_pipe_write_end *write_end,
332     int error_code,
333     struct aws_byte_cursor src_buffer,
334     void *user_data) {
335 
336     struct pipe_state *state = user_data;
337 
338     if (!error_code) {
339         state->buffers.num_bytes_written += src_buffer.len;
340     }
341 
342     int err = aws_pipe_clean_up_write_end(write_end);
343     if (err) {
344         goto error;
345     }
346 
347     s_signal_done_on_write_end_closed(state);
348 
349     return;
350 error:
351     s_signal_error(state);
352 }
353 
354 /* Write everything in the buffer, clean up write-end when write completes*/
s_write_once_task(struct pipe_state * state)355 static void s_write_once_task(struct pipe_state *state) {
356     struct aws_byte_cursor cursor = aws_byte_cursor_from_buf(&state->buffers.src);
357     int err = aws_pipe_write(&state->write_end, cursor, s_clean_up_write_end_on_write_completed, state);
358     if (err) {
359         goto error;
360     }
361 
362     return;
363 error:
364     s_signal_error(state);
365 }
366 
367 /* Task tries to read as much data as possible.
368  * Task repeatedly reschedules itself until read-buffer is full, then it cleans up the read-end */
s_read_everything_task(struct pipe_state * state)369 static void s_read_everything_task(struct pipe_state *state) {
370     int err = aws_pipe_read(&state->read_end, &state->buffers.dst, NULL);
371 
372     /* AWS_IO_READ_WOULD_BLOCK is an acceptable error, it just means the data's not ready yet */
373     if (err && (aws_last_error() != AWS_IO_READ_WOULD_BLOCK)) {
374         goto error;
375     }
376 
377     if (state->buffers.dst.len < state->buffers.dst.capacity) {
378         s_schedule_read_end_task(state, s_read_everything_task);
379     } else {
380         err = aws_pipe_clean_up_read_end(&state->read_end);
381         if (err) {
382             goto error;
383         }
384         s_signal_done_on_read_end_closed(state);
385     }
386 
387     return;
388 error:
389     s_signal_error(state);
390 }
391 
392 /* common function used by small-buffer test and large-buffer test */
s_test_pipe_read_write(struct pipe_state * state)393 static int s_test_pipe_read_write(struct pipe_state *state) {
394     s_schedule_read_end_task(state, s_read_everything_task);
395     s_schedule_write_end_task(state, s_write_once_task);
396 
397     ASSERT_SUCCESS(s_wait_for_results(state));
398 
399     ASSERT_SUCCESS(s_pipe_state_check_copied_data(state));
400 
401     return AWS_OP_SUCCESS;
402 }
403 
404 /* Test that a small buffer can be sent through the pipe */
test_pipe_read_write(struct pipe_state * state)405 static int test_pipe_read_write(struct pipe_state *state) {
406     return s_test_pipe_read_write(state);
407 }
408 
409 PIPE_TEST_CASE(pipe_read_write, SMALL_BUFFER_SIZE);
410 
411 /* Test that a large buffer can be sent through the pipe. */
test_pipe_read_write_large_buffer(struct pipe_state * state)412 static int test_pipe_read_write_large_buffer(struct pipe_state *state) {
413     return s_test_pipe_read_write(state);
414 }
415 
416 PIPE_TEST_CASE(pipe_read_write_large_buffer, GIANT_BUFFER_SIZE);
417 
s_on_readable_event(struct aws_pipe_read_end * read_end,int error_code,void * user_data)418 static void s_on_readable_event(struct aws_pipe_read_end *read_end, int error_code, void *user_data) {
419 
420     struct pipe_state *state = user_data;
421 
422     if (error_code == state->readable_events.error_code_to_monitor) {
423         state->readable_events.count++;
424 
425         if (state->readable_events.count == state->readable_events.close_read_end_after_n_events) {
426             int err = aws_pipe_clean_up_read_end(read_end);
427             if (err) {
428                 goto error;
429             }
430             s_signal_done_on_read_end_closed(state);
431         }
432     }
433 
434     return;
435 error:
436     s_signal_error(state);
437 }
438 
s_subscribe_task(struct pipe_state * state)439 static void s_subscribe_task(struct pipe_state *state) {
440     int err = aws_pipe_subscribe_to_readable_events(&state->read_end, s_on_readable_event, state);
441     if (err) {
442         goto error;
443     }
444 
445     return;
446 error:
447     s_signal_error(state);
448 }
449 
test_pipe_readable_event_sent_after_write(struct pipe_state * state)450 static int test_pipe_readable_event_sent_after_write(struct pipe_state *state) {
451     state->readable_events.error_code_to_monitor = AWS_ERROR_SUCCESS;
452     state->readable_events.close_read_end_after_n_events = 1;
453 
454     s_schedule_read_end_task(state, s_subscribe_task);
455     s_schedule_write_end_task(state, s_write_once_task);
456 
457     ASSERT_SUCCESS(s_wait_for_results(state));
458 
459     ASSERT_INT_EQUALS(1, state->readable_events.count);
460 
461     return AWS_OP_SUCCESS;
462 }
463 
464 PIPE_TEST_CASE(pipe_readable_event_sent_after_write, SMALL_BUFFER_SIZE);
465 
s_sentonce_on_readable_event(struct aws_pipe_read_end * read_end,int events,void * user_data)466 static void s_sentonce_on_readable_event(struct aws_pipe_read_end *read_end, int events, void *user_data) {
467     struct pipe_state *state = user_data;
468 
469     int prev_events_count = state->readable_events.count;
470 
471     /* invoke usual readable callback so the events are logged */
472     s_on_readable_event(read_end, events, user_data);
473     if (state->results.status_code) { /* bail out if anything went wrong */
474         return;
475     }
476 
477     /* when the 1st readable event comes in, schedule task to close read-end after waiting a bit.
478      * this lets us observe any further events that might come in */
479     if ((state->readable_events.count == 1) && (prev_events_count == 0)) {
480         s_schedule_task(state, state->read_loop, s_clean_up_read_end_task, 1 /*delay*/);
481     }
482 }
483 
s_sentonce_subscribe_task(struct pipe_state * state)484 static void s_sentonce_subscribe_task(struct pipe_state *state) {
485     int err = aws_pipe_subscribe_to_readable_events(&state->read_end, s_sentonce_on_readable_event, state);
486     if (err) {
487         goto error;
488     }
489 
490     return;
491 error:
492     s_signal_error(state);
493 }
494 
495 /* Check that readable event is only sent once after a write.
496  * Short name for test is: sentonce */
test_pipe_readable_event_sent_once(struct pipe_state * state)497 static int test_pipe_readable_event_sent_once(struct pipe_state *state) {
498     state->readable_events.error_code_to_monitor = AWS_ERROR_SUCCESS;
499 
500     s_schedule_read_end_task(state, s_sentonce_subscribe_task);
501     s_schedule_write_end_task(state, s_write_once_task);
502 
503     ASSERT_SUCCESS(s_wait_for_results(state));
504 
505     /* Accept 1 or 2 events. Epoll notifies about "readable" when sending "write end closed" event.
506      * That's fine, we just don't want dozens of readable events to have come in. */
507     ASSERT_TRUE(state->readable_events.count <= 2);
508 
509     return AWS_OP_SUCCESS;
510 }
511 PIPE_TEST_CASE(pipe_readable_event_sent_once, SMALL_BUFFER_SIZE);
512 
s_subscribe_on_write_completed(struct aws_pipe_write_end * write_end,int error_code,struct aws_byte_cursor src_buffer,void * user_data)513 void s_subscribe_on_write_completed(
514     struct aws_pipe_write_end *write_end,
515     int error_code,
516     struct aws_byte_cursor src_buffer,
517     void *user_data) {
518 
519     struct pipe_state *state = user_data;
520 
521     if (!error_code) {
522         state->buffers.num_bytes_written += src_buffer.len;
523     }
524 
525     int err = aws_pipe_clean_up_write_end(write_end);
526     if (err) {
527         goto error;
528     }
529     s_signal_done_on_write_end_closed(state);
530 
531     /* Tell read end to subscribe */
532     s_schedule_read_end_task(state, s_subscribe_task);
533 
534     return;
535 error:
536     s_signal_error(state);
537 }
538 
539 /* Write all data. When write completes, write-end cleans up and tells the read-end to subscribe */
s_write_once_then_subscribe_task(struct pipe_state * state)540 static void s_write_once_then_subscribe_task(struct pipe_state *state) {
541     struct aws_byte_cursor cursor = aws_byte_cursor_from_buf(&state->buffers.src);
542     int err = aws_pipe_write(&state->write_end, cursor, s_subscribe_on_write_completed, state);
543     if (err) {
544         goto error;
545     }
546 
547     return;
548 error:
549     s_signal_error(state);
550 }
551 
test_pipe_readable_event_sent_on_subscribe_if_data_present(struct pipe_state * state)552 static int test_pipe_readable_event_sent_on_subscribe_if_data_present(struct pipe_state *state) {
553     state->readable_events.error_code_to_monitor = AWS_ERROR_SUCCESS;
554     state->readable_events.close_read_end_after_n_events = 1;
555 
556     s_schedule_write_end_task(state, s_write_once_then_subscribe_task);
557 
558     ASSERT_SUCCESS(s_wait_for_results(state));
559 
560     ASSERT_INT_EQUALS(1, state->readable_events.count);
561 
562     return AWS_OP_SUCCESS;
563 }
564 
565 PIPE_TEST_CASE(pipe_readable_event_sent_on_subscribe_if_data_present, SMALL_BUFFER_SIZE);
566 
s_resubscribe_on_readable_event(struct aws_pipe_read_end * read_end,int events,void * user_data)567 static void s_resubscribe_on_readable_event(struct aws_pipe_read_end *read_end, int events, void *user_data) {
568     struct pipe_state *state = user_data;
569     int err = 0;
570 
571     int prev_events_count = state->readable_events.count;
572 
573     /* invoke usual readable callback so the events are logged */
574     s_on_readable_event(read_end, events, user_data);
575     if (state->results.status_code) { /* bail out if anything went wrong */
576         return;
577     }
578 
579     if ((state->readable_events.count == 1) && (prev_events_count == 0)) {
580         /* unsubscribe and resubscribe */
581         err = aws_pipe_unsubscribe_from_readable_events(&state->read_end);
582         if (err) {
583             goto error;
584         }
585 
586         err = aws_pipe_subscribe_to_readable_events(&state->read_end, s_on_readable_event, state);
587         if (err) {
588             goto error;
589         }
590     }
591 
592     return;
593 error:
594     s_signal_error(state);
595 }
596 
s_resubscribe_1_task(struct pipe_state * state)597 static void s_resubscribe_1_task(struct pipe_state *state) {
598     int err = aws_pipe_subscribe_to_readable_events(&state->read_end, s_resubscribe_on_readable_event, state);
599     if (err) {
600         goto error;
601     }
602 
603     return;
604 error:
605     s_signal_error(state);
606 }
607 
s_resubscribe_write_task(struct pipe_state * state)608 static void s_resubscribe_write_task(struct pipe_state *state) {
609     struct aws_byte_cursor cursor = aws_byte_cursor_from_buf(&state->buffers.src);
610     int err = aws_pipe_write(&state->write_end, cursor, s_clean_up_write_end_on_write_completed, state);
611     if (err) {
612         goto error;
613     }
614 
615     /* schedule task for read-end to perform 1st subscribe */
616     s_schedule_read_end_task(state, s_resubscribe_1_task);
617 
618     return;
619 error:
620     s_signal_error(state);
621 }
622 
test_pipe_readable_event_sent_on_resubscribe_if_data_present(struct pipe_state * state)623 static int test_pipe_readable_event_sent_on_resubscribe_if_data_present(struct pipe_state *state) {
624     state->readable_events.error_code_to_monitor = AWS_ERROR_SUCCESS;
625     state->readable_events.close_read_end_after_n_events = 2;
626 
627     s_schedule_write_end_task(state, s_resubscribe_write_task);
628 
629     ASSERT_SUCCESS(s_wait_for_results(state));
630 
631     ASSERT_INT_EQUALS(2, state->readable_events.count);
632 
633     return AWS_OP_SUCCESS;
634 }
635 
636 PIPE_TEST_CASE(pipe_readable_event_sent_on_resubscribe_if_data_present, SMALL_BUFFER_SIZE);
637 
s_readall_on_write_completed(struct aws_pipe_write_end * write_end,int error_code,struct aws_byte_cursor src_buffer,void * user_data)638 static void s_readall_on_write_completed(
639     struct aws_pipe_write_end *write_end,
640     int error_code,
641     struct aws_byte_cursor src_buffer,
642     void *user_data) {
643 
644     struct pipe_state *state = user_data;
645     int err = 0;
646 
647     if (error_code) {
648         goto error;
649     }
650 
651     bool is_2nd_write = (state->buffers.num_bytes_written > 0);
652 
653     state->buffers.num_bytes_written += src_buffer.len;
654 
655     /* Clean up after 2nd write */
656     if (is_2nd_write) {
657         err = aws_pipe_clean_up_write_end(write_end);
658         if (err) {
659             goto error;
660         }
661         s_signal_done_on_write_end_closed(state);
662     }
663 
664     return;
665 error:
666     s_signal_error(state);
667 }
668 
s_readall_write_task(struct pipe_state * state)669 static void s_readall_write_task(struct pipe_state *state) {
670     struct aws_byte_cursor cursor = aws_byte_cursor_from_buf(&state->buffers.src);
671     int err = aws_pipe_write(&state->write_end, cursor, s_readall_on_write_completed, state);
672     if (err) {
673         goto error;
674     }
675 
676     return;
677 error:
678     s_signal_error(state);
679 }
680 
s_readall_on_readable(struct aws_pipe_read_end * read_end,int events,void * user_data)681 static void s_readall_on_readable(struct aws_pipe_read_end *read_end, int events, void *user_data) {
682     struct pipe_state *state = user_data;
683     int err = 0;
684 
685     int prev_event_count = state->readable_events.count;
686 
687     /* invoke usual readable callback so the events are logged */
688     s_on_readable_event(read_end, events, user_data);
689     if (state->results.status_code) { /* bail out if anything went wrong */
690         return;
691     }
692 
693     if ((state->readable_events.count == 1) && (prev_event_count == 0)) {
694         size_t total_bytes_read = 0;
695 
696         /* After the first write, read data until we're told that further reads would block.
697          * This ensures that the next write is sure to trigger a readable event */
698         while (true) {
699             state->buffers.dst.len = 0;
700             err = aws_pipe_read(read_end, &state->buffers.dst, NULL);
701             total_bytes_read += state->buffers.dst.len;
702 
703             if (err) {
704                 if (aws_last_error() == AWS_IO_READ_WOULD_BLOCK) {
705                     break;
706                 }
707                 goto error;
708             }
709         }
710 
711         /* Sanity check that we did in fact read something */
712         if (total_bytes_read == 0) {
713             goto error;
714         }
715 
716         /* Schedule the 2nd write */
717         s_schedule_write_end_task(state, s_readall_write_task);
718     }
719 
720     return;
721 error:
722     s_signal_error(state);
723 }
724 
s_readall_subscribe_task(struct pipe_state * state)725 static void s_readall_subscribe_task(struct pipe_state *state) {
726     int err = aws_pipe_subscribe_to_readable_events(&state->read_end, s_readall_on_readable, state);
727     if (err) {
728         goto error;
729     }
730 
731     return;
732 error:
733     s_signal_error(state);
734 }
735 
736 /* Check that the 2nd readable event is sent again in the case of: subscribe, write 1, read all, write 2
737  * Short name for test is: readall */
test_pipe_readable_event_sent_again_after_all_data_read(struct pipe_state * state)738 static int test_pipe_readable_event_sent_again_after_all_data_read(struct pipe_state *state) {
739     state->readable_events.error_code_to_monitor = AWS_ERROR_SUCCESS;
740     state->readable_events.close_read_end_after_n_events = 2;
741 
742     s_schedule_read_end_task(state, s_readall_subscribe_task);
743     s_schedule_write_end_task(state, s_readall_write_task);
744 
745     ASSERT_SUCCESS(s_wait_for_results(state));
746 
747     ASSERT_INT_EQUALS(2, state->readable_events.count);
748 
749     return AWS_OP_SUCCESS;
750 }
751 
752 PIPE_TEST_CASE(pipe_readable_event_sent_again_after_all_data_read, SMALL_BUFFER_SIZE);
753 
s_subscribe_and_schedule_write_end_clean_up_task(struct pipe_state * state)754 static void s_subscribe_and_schedule_write_end_clean_up_task(struct pipe_state *state) {
755     int err = aws_pipe_subscribe_to_readable_events(&state->read_end, s_on_readable_event, state);
756     if (err) {
757         goto error;
758     }
759 
760     /* schedule write end to clean up */
761     s_schedule_write_end_task(state, s_clean_up_write_end_task);
762 
763     return;
764 error:
765     s_signal_error(state);
766 }
767 
test_pipe_error_event_sent_after_write_end_closed(struct pipe_state * state)768 static int test_pipe_error_event_sent_after_write_end_closed(struct pipe_state *state) {
769     state->readable_events.error_code_to_monitor = AWS_IO_BROKEN_PIPE;
770     state->readable_events.close_read_end_after_n_events = 1;
771 
772     s_schedule_read_end_task(state, s_subscribe_and_schedule_write_end_clean_up_task);
773 
774     ASSERT_SUCCESS(s_wait_for_results(state));
775 
776     ASSERT_INT_EQUALS(1, state->readable_events.count);
777 
778     return AWS_OP_SUCCESS;
779 }
780 
781 PIPE_TEST_CASE(pipe_error_event_sent_after_write_end_closed, SMALL_BUFFER_SIZE);
782 
s_clean_up_write_end_then_schedule_subscribe_task(struct pipe_state * state)783 static void s_clean_up_write_end_then_schedule_subscribe_task(struct pipe_state *state) {
784     int err = aws_pipe_clean_up_write_end(&state->write_end);
785     if (err) {
786         goto error;
787     }
788     s_signal_done_on_write_end_closed(state);
789 
790     s_schedule_read_end_task(state, s_subscribe_task);
791 
792     return;
793 error:
794     s_signal_error(state);
795 }
796 
test_pipe_error_event_sent_on_subscribe_if_write_end_already_closed(struct pipe_state * state)797 static int test_pipe_error_event_sent_on_subscribe_if_write_end_already_closed(struct pipe_state *state) {
798     state->readable_events.error_code_to_monitor = AWS_IO_BROKEN_PIPE;
799     state->readable_events.close_read_end_after_n_events = 1;
800 
801     s_schedule_write_end_task(state, s_clean_up_write_end_then_schedule_subscribe_task);
802 
803     ASSERT_SUCCESS(s_wait_for_results(state));
804 
805     ASSERT_INT_EQUALS(1, state->readable_events.count);
806 
807     return AWS_OP_SUCCESS;
808 }
809 
810 PIPE_TEST_CASE(pipe_error_event_sent_on_subscribe_if_write_end_already_closed, SMALL_BUFFER_SIZE);
811 
s_close_write_end_after_all_writes_completed(struct aws_pipe_write_end * write_end,int error_code,struct aws_byte_cursor src_buffer,void * user_data)812 static void s_close_write_end_after_all_writes_completed(
813     struct aws_pipe_write_end *write_end,
814     int error_code,
815     struct aws_byte_cursor src_buffer,
816     void *user_data) {
817 
818     struct pipe_state *state = user_data;
819 
820     if (error_code) {
821         goto error;
822     }
823 
824     state->buffers.num_bytes_written += src_buffer.len;
825 
826     if (state->buffers.num_bytes_written == state->buffer_size) {
827         int err = aws_pipe_clean_up_write_end(write_end);
828         if (err) {
829             goto error;
830         }
831         s_signal_done_on_write_end_closed(state);
832     }
833 
834     return;
835 error:
836     s_signal_error(state);
837 }
838 
s_write_in_simultaneous_chunks_task(struct pipe_state * state)839 static void s_write_in_simultaneous_chunks_task(struct pipe_state *state) {
840     /* Write the whole buffer via several successive writes */
841     struct aws_byte_cursor cursor = aws_byte_cursor_from_buf(&state->buffers.src);
842     const size_t chunk_size = cursor.len / 8;
843     while (cursor.len > 0) {
844         size_t bytes_to_write = (chunk_size < cursor.len) ? chunk_size : cursor.len;
845         struct aws_byte_cursor chunk_cursor = aws_byte_cursor_from_array(cursor.ptr, bytes_to_write);
846 
847         int err = aws_pipe_write(&state->write_end, chunk_cursor, s_close_write_end_after_all_writes_completed, state);
848         if (err) {
849             goto error;
850         }
851 
852         aws_byte_cursor_advance(&cursor, bytes_to_write);
853     }
854 
855     return;
856 error:
857     s_signal_error(state);
858 }
859 
test_pipe_writes_are_fifo(struct pipe_state * state)860 static int test_pipe_writes_are_fifo(struct pipe_state *state) {
861 
862     s_schedule_read_end_task(state, s_read_everything_task);
863     s_schedule_write_end_task(state, s_write_in_simultaneous_chunks_task);
864 
865     ASSERT_SUCCESS(s_wait_for_results(state));
866 
867     ASSERT_SUCCESS(s_pipe_state_check_copied_data(state));
868 
869     return AWS_OP_SUCCESS;
870 }
871 
872 PIPE_TEST_CASE(pipe_writes_are_fifo, GIANT_BUFFER_SIZE);
873 
s_cancelled_on_write_completed(struct aws_pipe_write_end * write_end,int error_code,struct aws_byte_cursor src_buffer,void * user_data)874 static void s_cancelled_on_write_completed(
875     struct aws_pipe_write_end *write_end,
876     int error_code,
877     struct aws_byte_cursor src_buffer,
878     void *user_data) {
879 
880     (void)write_end;
881     struct pipe_state *state = user_data;
882 
883     int *write_status_code = state->test_data;
884     *write_status_code = error_code;
885 
886     if (!error_code) {
887         state->buffers.num_bytes_written += src_buffer.len;
888     }
889 
890     s_schedule_read_end_task(state, s_clean_up_read_end_task);
891 }
892 
s_write_then_clean_up_task(struct pipe_state * state)893 static void s_write_then_clean_up_task(struct pipe_state *state) {
894     struct aws_byte_cursor cursor = aws_byte_cursor_from_buf(&state->buffers.src);
895     int err = aws_pipe_write(&state->write_end, cursor, s_cancelled_on_write_completed, state);
896     if (err) {
897         goto error;
898     }
899 
900     err = aws_pipe_clean_up_write_end(&state->write_end);
901     if (err) {
902         goto error;
903     }
904     s_signal_done_on_write_end_closed(state);
905 
906     return;
907 error:
908     s_signal_error(state);
909 }
910 
911 /* Perform an enormous write that can't possibly complete without a bit of reading.
912  * After kicking off the write operation, close the write-end.
913  * The write operation chould complete with an error status */
test_pipe_clean_up_cancels_pending_writes(struct pipe_state * state)914 static int test_pipe_clean_up_cancels_pending_writes(struct pipe_state *state) {
915     /* capture the status code from the on-write-complete callback */
916     int write_status_code = 0;
917     state->test_data = &write_status_code;
918 
919     s_schedule_write_end_task(state, s_write_then_clean_up_task);
920 
921     ASSERT_SUCCESS(s_wait_for_results(state));
922 
923     ASSERT_INT_EQUALS(AWS_IO_BROKEN_PIPE, write_status_code);
924     ASSERT_TRUE(state->buffers.num_bytes_written < state->buffer_size);
925 
926     return AWS_OP_SUCCESS;
927 }
928 
929 PIPE_TEST_CASE(pipe_clean_up_cancels_pending_writes, GIANT_BUFFER_SIZE);
930