1 /**
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  * SPDX-License-Identifier: Apache-2.0.
4  */
5 
6 #include <aws/io/pipe.h>
7 
8 #include <aws/common/task_scheduler.h>
9 #include <aws/io/event_loop.h>
10 
11 #include <stdbool.h>
12 #include <stdio.h>
13 
14 enum read_end_state {
15     /* Pipe is open. */
16     READ_END_STATE_OPEN,
17 
18     /* Pipe is open, user has subscribed, but async monitoring hasn't started yet.
19      * Pipe moves to SUBCSCRIBED state if async monitoring starts successfully
20      * or SUBSCRIBE_ERROR state if it doesn't start successfully.
21      * From any of the SUBSCRIBE* states, the pipe moves to OPEN state if the user unsubscribes. */
22     READ_END_STATE_SUBSCRIBING,
23 
24     /* Pipe is open, user has subscribed, and user is receiving events delivered by async monitoring.
25      * Async monitoring is paused once the file is known to be readable.
26      * Async monitoring is resumed once the user reads all available bytes.
27      * Pipe moves to SUBSCRIBE_ERROR state if async monitoring reports an error, or fails to restart.
28      * Pipe move sto OPEN state if user unsubscribes. */
29     READ_END_STATE_SUBSCRIBED,
30 
31     /* Pipe is open, use has subscribed, and an error event has been delivered to the user.
32      * No further error events are delivered to the user, and no more async monitoring occurs.*/
33     READ_END_STATE_SUBSCRIBE_ERROR,
34 };
35 
36 /* Reasons to launch async monitoring of the read-end's handle */
37 enum monitoring_reason {
38     MONITORING_BECAUSE_SUBSCRIBING = 1,
39     MONITORING_BECAUSE_WAITING_FOR_DATA = 2,
40     MONITORING_BECAUSE_ERROR_SUSPECTED = 4,
41 };
42 
43 /* Async operations live in their own allocations.
44  * This allows the pipe to be cleaned up without waiting for all outstanding operations to complete.  */
45 struct async_operation {
46     union {
47         struct aws_overlapped overlapped;
48         struct aws_task task;
49     } op;
50 
51     struct aws_allocator *alloc;
52     bool is_active;
53     bool is_read_end_cleaned_up;
54 };
55 
56 struct read_end_impl {
57     struct aws_allocator *alloc;
58 
59     enum read_end_state state;
60 
61     struct aws_io_handle handle;
62 
63     struct aws_event_loop *event_loop;
64 
65     /* Async overlapped operation for monitoring pipe status.
66      * This operation is re-used each time monitoring resumes.
67      * Note that rapidly subscribing/unsubscribing could lead to the monitoring operation from a previous subscribe
68      * still pending while the user is re-subscribing. */
69     struct async_operation *async_monitoring;
70 
71     /* Async task operation used to deliver error reports. */
72     struct async_operation *async_error_report;
73 
74     aws_pipe_on_readable_fn *on_readable_user_callback;
75     void *on_readable_user_data;
76 
77     /* Error code that the error-reporting task will report. */
78     int error_code_to_report;
79 
80     /* Reasons to restart monitoring once current async operation completes.
81      * Contains read_end_monitoring_request_t flags.*/
82     uint8_t monitoring_request_reasons;
83 };
84 
85 enum write_end_state {
86     WRITE_END_STATE_CLOSING,
87     WRITE_END_STATE_OPEN,
88 };
89 
90 /* Data describing an async write request */
91 struct write_request {
92     struct aws_byte_cursor original_cursor;
93     aws_pipe_on_write_completed_fn *user_callback;
94     void *user_data;
95     struct aws_allocator *alloc;
96     struct aws_overlapped overlapped;
97     struct aws_linked_list_node list_node;
98     bool is_write_end_cleaned_up;
99 };
100 
101 struct write_end_impl {
102     struct aws_allocator *alloc;
103     enum write_end_state state;
104     struct aws_io_handle handle;
105     struct aws_event_loop *event_loop;
106 
107     /* List of currently active write_requests */
108     struct aws_linked_list write_list;
109 
110     /* Future optimization idea: avoid an allocation on each write by keeping 1 pre-allocated write_request around
111      * and re-using it whenever possible */
112 };
113 
114 enum {
115     PIPE_BUFFER_SIZE = 4096,
116     PIPE_UNIQUE_NAME_MAX_TRIES = 10,
117 };
118 
119 static void s_read_end_on_zero_byte_read_completion(
120     struct aws_event_loop *event_loop,
121     struct aws_overlapped *overlapped,
122     int status_code,
123     size_t num_bytes_transferred);
124 static void s_read_end_report_error_task(struct aws_task *task, void *user_data, enum aws_task_status status);
125 static void s_write_end_on_write_completion(
126     struct aws_event_loop *event_loop,
127     struct aws_overlapped *overlapped,
128     int status_code,
129     size_t num_bytes_transferred);
130 
131 /* Translate Windows errors into aws_pipe errors */
s_translate_windows_error(DWORD win_error)132 static int s_translate_windows_error(DWORD win_error) {
133     switch (win_error) {
134         case ERROR_BROKEN_PIPE:
135             return AWS_IO_BROKEN_PIPE;
136         case 0xC000014B: /* STATUS_PIPE_BROKEN */
137             return AWS_IO_BROKEN_PIPE;
138         case 0xC0000120: /* STATUS_CANCELLED */
139             return AWS_IO_BROKEN_PIPE;
140         default:
141             return AWS_ERROR_SYS_CALL_FAILURE;
142     }
143 }
144 
s_raise_last_windows_error(void)145 static int s_raise_last_windows_error(void) {
146     DWORD win_error = GetLastError();
147     int aws_error = s_translate_windows_error(win_error);
148     return aws_raise_error(aws_error);
149 }
150 
151 AWS_THREAD_LOCAL uint32_t tl_unique_name_counter = 0;
152 
aws_pipe_get_unique_name(char * dst,size_t dst_size)153 AWS_IO_API int aws_pipe_get_unique_name(char *dst, size_t dst_size) {
154     /* For local pipes, name should be unique per-machine.
155      * Mix together several sources that should should lead to something unique. */
156 
157     DWORD process_id = GetCurrentProcessId();
158 
159     DWORD thread_id = GetCurrentThreadId();
160 
161     uint32_t counter = tl_unique_name_counter++;
162 
163     LARGE_INTEGER timestamp;
164     bool success = QueryPerformanceCounter(&timestamp);
165     AWS_ASSERT(success);
166     (void)success; /* QueryPerformanceCounter() always succeeds on XP and later */
167 
168     /* snprintf() returns number of characters (not including '\0') which would have written if dst_size was ignored */
169     int ideal_strlen = snprintf(
170         dst,
171         dst_size,
172         "\\\\.\\pipe\\aws_pipe_%08x_%08x_%08x_%08x%08x",
173         process_id,
174         thread_id,
175         counter,
176         timestamp.HighPart,
177         timestamp.LowPart);
178 
179     AWS_ASSERT(ideal_strlen > 0);
180     if (dst_size < (size_t)(ideal_strlen + 1)) {
181         return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
182     }
183 
184     return AWS_OP_SUCCESS;
185 }
186 
aws_pipe_init(struct aws_pipe_read_end * read_end,struct aws_event_loop * read_end_event_loop,struct aws_pipe_write_end * write_end,struct aws_event_loop * write_end_event_loop,struct aws_allocator * allocator)187 int aws_pipe_init(
188     struct aws_pipe_read_end *read_end,
189     struct aws_event_loop *read_end_event_loop,
190     struct aws_pipe_write_end *write_end,
191     struct aws_event_loop *write_end_event_loop,
192     struct aws_allocator *allocator) {
193 
194     AWS_ASSERT(read_end);
195     AWS_ASSERT(read_end_event_loop);
196     AWS_ASSERT(write_end);
197     AWS_ASSERT(write_end_event_loop);
198     AWS_ASSERT(allocator);
199 
200     AWS_ZERO_STRUCT(*write_end);
201     AWS_ZERO_STRUCT(*read_end);
202 
203     struct write_end_impl *write_impl = NULL;
204     struct read_end_impl *read_impl = NULL;
205 
206     /* Init write-end */
207     write_impl = aws_mem_calloc(allocator, 1, sizeof(struct write_end_impl));
208     if (!write_impl) {
209         goto clean_up;
210     }
211 
212     write_impl->alloc = allocator;
213     write_impl->state = WRITE_END_STATE_OPEN;
214     write_impl->handle.data.handle = INVALID_HANDLE_VALUE;
215     aws_linked_list_init(&write_impl->write_list);
216 
217     /* Anonymous pipes don't support overlapped I/O so named pipes are used. Names must be unique system-wide.
218      * We generate random names, but collisions are theoretically possible, so try several times before giving up. */
219     char pipe_name[256];
220     int tries = 0;
221     while (true) {
222         int err = aws_pipe_get_unique_name(pipe_name, sizeof(pipe_name));
223         if (err) {
224             goto clean_up;
225         }
226 
227         const DWORD open_mode = PIPE_ACCESS_OUTBOUND | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE;
228 
229         const DWORD pipe_mode = PIPE_TYPE_BYTE | PIPE_WAIT | PIPE_REJECT_REMOTE_CLIENTS;
230 
231         write_impl->handle.data.handle = CreateNamedPipeA(
232             pipe_name,
233             open_mode,
234             pipe_mode,
235             1,                /*nMaxInstances*/
236             PIPE_BUFFER_SIZE, /*nOutBufferSize*/
237             PIPE_BUFFER_SIZE, /*nInBufferSize*/
238             0,                /*nDefaultTimeout: 0 means default*/
239             NULL);            /*lpSecurityAttributes: NULL means default */
240 
241         if (write_impl->handle.data.handle != INVALID_HANDLE_VALUE) {
242             /* Success, break out of loop */
243             break;
244         }
245 
246         if (++tries >= PIPE_UNIQUE_NAME_MAX_TRIES) {
247             s_raise_last_windows_error();
248             goto clean_up;
249         }
250     }
251 
252     int err = aws_event_loop_connect_handle_to_io_completion_port(write_end_event_loop, &write_impl->handle);
253     if (err) {
254         goto clean_up;
255     }
256 
257     write_impl->event_loop = write_end_event_loop;
258 
259     /* Init read-end */
260     read_impl = aws_mem_calloc(allocator, 1, sizeof(struct read_end_impl));
261     if (!read_impl) {
262         goto clean_up;
263     }
264 
265     read_impl->alloc = allocator;
266     read_impl->state = READ_END_STATE_OPEN;
267     read_impl->handle.data.handle = INVALID_HANDLE_VALUE;
268 
269     read_impl->handle.data.handle = CreateFileA(
270         pipe_name,     /*lpFileName*/
271         GENERIC_READ,  /*dwDesiredAccess*/
272         0,             /*dwShareMode: 0 prevents acess by external processes*/
273         NULL,          /*lpSecurityAttributes: NULL prevents inheritance by child processes*/
274         OPEN_EXISTING, /*dwCreationDisposition*/
275         FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, /*dwFlagsAndAttributes*/
276         NULL);                                        /*hTemplateFile: ignored when opening existing file*/
277 
278     if (read_impl->handle.data.handle == INVALID_HANDLE_VALUE) {
279         s_raise_last_windows_error();
280         goto clean_up;
281     }
282 
283     err = aws_event_loop_connect_handle_to_io_completion_port(read_end_event_loop, &read_impl->handle);
284     if (err) {
285         goto clean_up;
286     }
287 
288     read_impl->event_loop = read_end_event_loop;
289 
290     /* Init the read-end's async operations */
291     read_impl->async_monitoring = aws_mem_calloc(allocator, 1, sizeof(struct async_operation));
292     if (!read_impl->async_monitoring) {
293         goto clean_up;
294     }
295 
296     read_impl->async_monitoring->alloc = allocator;
297     aws_overlapped_init(&read_impl->async_monitoring->op.overlapped, s_read_end_on_zero_byte_read_completion, read_end);
298 
299     read_impl->async_error_report = aws_mem_calloc(allocator, 1, sizeof(struct async_operation));
300     if (!read_impl->async_error_report) {
301         goto clean_up;
302     }
303 
304     read_impl->async_error_report->alloc = allocator;
305     aws_task_init(
306         &read_impl->async_error_report->op.task, s_read_end_report_error_task, read_end, "pipe_read_end_report_error");
307 
308     /* Success */
309     write_end->impl_data = write_impl;
310     read_end->impl_data = read_impl;
311     return AWS_OP_SUCCESS;
312 
313 clean_up:
314     if (write_impl) {
315         if (write_impl->handle.data.handle != INVALID_HANDLE_VALUE) {
316             CloseHandle(write_impl->handle.data.handle);
317         }
318 
319         aws_mem_release(allocator, write_impl);
320         write_impl = NULL;
321     }
322 
323     if (read_impl) {
324         if (read_impl->handle.data.handle != INVALID_HANDLE_VALUE) {
325             CloseHandle(read_impl->handle.data.handle);
326         }
327 
328         if (read_impl->async_monitoring) {
329             aws_mem_release(allocator, read_impl->async_monitoring);
330         }
331 
332         if (read_impl->async_error_report) {
333             aws_mem_release(allocator, read_impl->async_error_report);
334         }
335 
336         aws_mem_release(allocator, read_impl);
337         read_impl = NULL;
338     }
339 
340     return AWS_OP_ERR;
341 }
342 
aws_pipe_get_read_end_event_loop(const struct aws_pipe_read_end * read_end)343 struct aws_event_loop *aws_pipe_get_read_end_event_loop(const struct aws_pipe_read_end *read_end) {
344     struct read_end_impl *read_impl = read_end->impl_data;
345     if (!read_impl) {
346         aws_raise_error(AWS_IO_BROKEN_PIPE);
347         return NULL;
348     }
349 
350     return read_impl->event_loop;
351 }
352 
aws_pipe_get_write_end_event_loop(const struct aws_pipe_write_end * write_end)353 struct aws_event_loop *aws_pipe_get_write_end_event_loop(const struct aws_pipe_write_end *write_end) {
354     struct write_end_impl *write_impl = write_end->impl_data;
355     if (!write_impl) {
356         aws_raise_error(AWS_IO_BROKEN_PIPE);
357         return NULL;
358     }
359 
360     return write_impl->event_loop;
361 }
362 
aws_pipe_clean_up_read_end(struct aws_pipe_read_end * read_end)363 int aws_pipe_clean_up_read_end(struct aws_pipe_read_end *read_end) {
364 
365     struct read_end_impl *read_impl = read_end->impl_data;
366     if (!read_impl) {
367         return aws_raise_error(AWS_IO_BROKEN_PIPE);
368     }
369 
370     if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) {
371         return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
372     }
373 
374     CloseHandle(read_impl->handle.data.handle);
375 
376     /* If the async operations are inactive they can be deleted now.
377      * Otherwise, inform the operations of the clean-up so they can delete themselves upon completion. */
378     if (!read_impl->async_monitoring->is_active) {
379         aws_mem_release(read_impl->alloc, read_impl->async_monitoring);
380     } else {
381         read_impl->async_monitoring->is_read_end_cleaned_up = true;
382     }
383 
384     if (!read_impl->async_error_report->is_active) {
385         aws_mem_release(read_impl->alloc, read_impl->async_error_report);
386     } else {
387         read_impl->async_error_report->is_read_end_cleaned_up = true;
388     }
389 
390     aws_mem_release(read_impl->alloc, read_impl);
391     AWS_ZERO_STRUCT(*read_end);
392 
393     return AWS_OP_SUCCESS;
394 }
395 
396 /* Return whether a user is subscribed to receive read events */
s_read_end_is_subscribed(struct aws_pipe_read_end * read_end)397 static bool s_read_end_is_subscribed(struct aws_pipe_read_end *read_end) {
398     struct read_end_impl *read_impl = read_end->impl_data;
399     switch (read_impl->state) {
400         case READ_END_STATE_SUBSCRIBING:
401         case READ_END_STATE_SUBSCRIBED:
402         case READ_END_STATE_SUBSCRIBE_ERROR:
403             return true;
404         default:
405             return false;
406     }
407 }
408 
409 /* Detect events on the pipe by kicking off an async zero-byte-read.
410  * When the pipe becomes readable or an error occurs, the read will
411  * complete and we will report the event. */
s_read_end_request_async_monitoring(struct aws_pipe_read_end * read_end,int request_reason)412 static void s_read_end_request_async_monitoring(struct aws_pipe_read_end *read_end, int request_reason) {
413     struct read_end_impl *read_impl = read_end->impl_data;
414     AWS_ASSERT(read_impl);
415 
416     /* We only do async monitoring while user is subscribed, but not if we've
417      * reported an error and moved into the SUBSCRIBE_ERROR state */
418     bool async_monitoring_allowed =
419         s_read_end_is_subscribed(read_end) && (read_impl->state != READ_END_STATE_SUBSCRIBE_ERROR);
420     if (!async_monitoring_allowed) {
421         return;
422     }
423 
424     /* We can only have one monitoring operation active at a time. Save off
425      * the reason for the request. When the current operation completes,
426      * if this reason is still valid, we'll re-launch async monitoring */
427     if (read_impl->async_monitoring->is_active) {
428         read_impl->monitoring_request_reasons |= request_reason;
429         return;
430     }
431 
432     AWS_ASSERT(read_impl->error_code_to_report == 0);
433 
434     read_impl->monitoring_request_reasons = 0;
435     read_impl->state = READ_END_STATE_SUBSCRIBED;
436 
437     /* aws_overlapped must be reset before each use */
438     aws_overlapped_reset(&read_impl->async_monitoring->op.overlapped);
439 
440     int fake_buffer;
441     bool success = ReadFile(
442         read_impl->handle.data.handle,
443         &fake_buffer,
444         0,    /*nNumberOfBytesToRead*/
445         NULL, /*lpNumberOfBytesRead: NULL for an overlapped operation*/
446         &read_impl->async_monitoring->op.overlapped.overlapped);
447 
448     if (success || (GetLastError() == ERROR_IO_PENDING)) {
449         /* Success launching zero-byte-read, aka async monitoring operation */
450         read_impl->async_monitoring->is_active = true;
451         return;
452     }
453 
454     /* User is subscribed for IO events and expects to be notified of errors via the event callback.
455      * We schedule this as a task so the callback doesn't happen before the user expects it.
456      * We also set the state to SUBSCRIBE_ERROR so we don't keep trying to monitor the file. */
457     read_impl->state = READ_END_STATE_SUBSCRIBE_ERROR;
458     read_impl->error_code_to_report = s_translate_windows_error(GetLastError());
459     read_impl->async_error_report->is_active = true;
460     aws_event_loop_schedule_task_now(read_impl->event_loop, &read_impl->async_error_report->op.task);
461 }
462 
s_read_end_report_error_task(struct aws_task * task,void * user_data,enum aws_task_status status)463 static void s_read_end_report_error_task(struct aws_task *task, void *user_data, enum aws_task_status status) {
464     (void)status; /* Do same work whether or not this is a "cancelled" task */
465 
466     struct async_operation *async_op = AWS_CONTAINER_OF(task, struct async_operation, op);
467     AWS_ASSERT(async_op->is_active);
468     async_op->is_active = false;
469 
470     /* If the read end has been cleaned up, don't report the error, just free the task's memory. */
471     if (async_op->is_read_end_cleaned_up) {
472         aws_mem_release(async_op->alloc, async_op);
473         return;
474     }
475 
476     struct aws_pipe_read_end *read_end = user_data;
477     struct read_end_impl *read_impl = read_end->impl_data;
478     AWS_ASSERT(read_impl);
479 
480     /* Only report the error if we're still in the SUBSCRIBE_ERROR state.
481      * If the user unsubscribed since this task was queued, then we'd be in a different state. */
482     if (read_impl->state == READ_END_STATE_SUBSCRIBE_ERROR) {
483         AWS_ASSERT(read_impl->error_code_to_report != 0);
484 
485         if (read_impl->on_readable_user_callback) {
486             read_impl->on_readable_user_callback(
487                 read_end, read_impl->error_code_to_report, read_impl->on_readable_user_data);
488         }
489     }
490 }
491 
s_read_end_on_zero_byte_read_completion(struct aws_event_loop * event_loop,struct aws_overlapped * overlapped,int status_code,size_t num_bytes_transferred)492 static void s_read_end_on_zero_byte_read_completion(
493     struct aws_event_loop *event_loop,
494     struct aws_overlapped *overlapped,
495     int status_code,
496     size_t num_bytes_transferred) {
497 
498     (void)event_loop;
499     (void)num_bytes_transferred;
500 
501     struct async_operation *async_op = AWS_CONTAINER_OF(overlapped, struct async_operation, op);
502 
503     /* If the read-end has been cleaned up, simply free the operation's memory and return. */
504     if (async_op->is_read_end_cleaned_up) {
505         aws_mem_release(async_op->alloc, async_op);
506         return;
507     }
508 
509     struct aws_pipe_read_end *read_end = overlapped->user_data;
510     struct read_end_impl *read_impl = read_end->impl_data;
511     AWS_ASSERT(read_impl);
512 
513     /* Only report events to user when in the SUBSCRIBED state.
514      * If in the SUBSCRIBING state, this completion is from an operation begun during a previous subscription. */
515     if (read_impl->state == READ_END_STATE_SUBSCRIBED) {
516         int readable_error_code;
517 
518         if (status_code == 0) {
519             readable_error_code = AWS_ERROR_SUCCESS;
520 
521             /* Clear out the "waiting for data" reason to restart zero-byte-read, since we're about to tell the user
522              * that the pipe is readable. If the user consumes all the data, the "waiting for data" reason will get set
523              * again and async-monitoring will be relaunched at the end of this function. */
524             read_impl->monitoring_request_reasons &= ~MONITORING_BECAUSE_WAITING_FOR_DATA;
525 
526         } else {
527             readable_error_code = AWS_IO_BROKEN_PIPE;
528 
529             /* Move pipe to SUBSCRIBE_ERROR state to prevent further monitoring */
530             read_impl->state = READ_END_STATE_SUBSCRIBE_ERROR;
531         }
532 
533         if (read_impl->on_readable_user_callback) {
534             read_impl->on_readable_user_callback(read_end, readable_error_code, read_impl->on_readable_user_data);
535         }
536     }
537 
538     /* Note that the user callback might have invoked aws_pipe_clean_up_read_end().
539      * If so, clean up the operation's memory.
540      * Otherwise, relaunch the monitoring operation if there's a reason to do so */
541     AWS_ASSERT(async_op->is_active);
542     async_op->is_active = false;
543 
544     if (async_op->is_read_end_cleaned_up) {
545         aws_mem_release(async_op->alloc, async_op);
546     } else if (read_impl->monitoring_request_reasons != 0) {
547         s_read_end_request_async_monitoring(read_end, read_impl->monitoring_request_reasons);
548     }
549 }
550 
aws_pipe_subscribe_to_readable_events(struct aws_pipe_read_end * read_end,aws_pipe_on_readable_fn * on_readable,void * user_data)551 int aws_pipe_subscribe_to_readable_events(
552     struct aws_pipe_read_end *read_end,
553     aws_pipe_on_readable_fn *on_readable,
554     void *user_data) {
555 
556     struct read_end_impl *read_impl = read_end->impl_data;
557     if (!read_impl) {
558         return aws_raise_error(AWS_IO_BROKEN_PIPE);
559     }
560 
561     if (read_impl->state != READ_END_STATE_OPEN) {
562         /* Return specific error about why user can't subscribe */
563         if (s_read_end_is_subscribed(read_end)) {
564             return aws_raise_error(AWS_ERROR_IO_ALREADY_SUBSCRIBED);
565         }
566 
567         AWS_ASSERT(0); /* Unexpected state */
568         return aws_raise_error(AWS_ERROR_UNKNOWN);
569     }
570 
571     if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) {
572         return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
573     }
574 
575     read_impl->state = READ_END_STATE_SUBSCRIBING;
576     read_impl->on_readable_user_callback = on_readable;
577     read_impl->on_readable_user_data = user_data;
578 
579     s_read_end_request_async_monitoring(read_end, MONITORING_BECAUSE_SUBSCRIBING);
580 
581     return AWS_OP_SUCCESS;
582 }
583 
aws_pipe_unsubscribe_from_readable_events(struct aws_pipe_read_end * read_end)584 int aws_pipe_unsubscribe_from_readable_events(struct aws_pipe_read_end *read_end) {
585     struct read_end_impl *read_impl = read_end->impl_data;
586     if (!read_impl) {
587         return aws_raise_error(AWS_IO_BROKEN_PIPE);
588     }
589 
590     if (!s_read_end_is_subscribed(read_end)) {
591         return aws_raise_error(AWS_ERROR_IO_NOT_SUBSCRIBED);
592     }
593 
594     if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) {
595         return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
596     }
597 
598     read_impl->state = READ_END_STATE_OPEN;
599     read_impl->on_readable_user_callback = NULL;
600     read_impl->on_readable_user_data = NULL;
601     read_impl->monitoring_request_reasons = 0;
602     read_impl->error_code_to_report = 0;
603 
604     /* If there's a chance the zero-byte-read is pending, cancel it.
605      * s_read_end_on_zero_byte_read_completion() will see status code
606      * ERROR_OPERATION_ABORTED, but won't pass the event to the user
607      * because we're not in the SUBSCRIBED state anymore. */
608     if (read_impl->async_monitoring->is_active) {
609         CancelIo(read_impl->handle.data.handle);
610     }
611 
612     return AWS_OP_SUCCESS;
613 }
614 
aws_pipe_read(struct aws_pipe_read_end * read_end,struct aws_byte_buf * dst_buffer,size_t * amount_read)615 int aws_pipe_read(struct aws_pipe_read_end *read_end, struct aws_byte_buf *dst_buffer, size_t *amount_read) {
616     AWS_ASSERT(dst_buffer && dst_buffer->buffer);
617 
618     struct read_end_impl *read_impl = read_end->impl_data;
619     if (!read_impl) {
620         return aws_raise_error(AWS_IO_BROKEN_PIPE);
621     }
622 
623     if (amount_read) {
624         *amount_read = 0;
625     }
626 
627     if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) {
628         return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
629     }
630 
631     /* Just return success if user requests 0 data */
632     if (dst_buffer->capacity <= dst_buffer->len) {
633         return AWS_OP_SUCCESS;
634     }
635 
636     /* ReadFile() will be called in synchronous mode and would block indefinitely if it asked for more bytes than are
637      * currently available. Therefore, peek at the available bytes before performing the actual read. */
638     DWORD bytes_available = 0;
639     bool peek_success = PeekNamedPipe(
640         read_impl->handle.data.handle,
641         NULL,             /*lpBuffer: NULL so peek doesn't actually copy data */
642         0,                /*nBufferSize*/
643         NULL,             /*lpBytesRead*/
644         &bytes_available, /*lpTotalBytesAvail*/
645         NULL);            /*lpBytesLeftThisMessage: doesn't apply to byte-type pipes*/
646 
647     /* If operation failed. Request async monitoring so user is informed via aws_pipe_on_readable_fn of handle error. */
648     if (!peek_success) {
649         s_read_end_request_async_monitoring(read_end, MONITORING_BECAUSE_ERROR_SUSPECTED);
650         return s_raise_last_windows_error();
651     }
652 
653     /* If no data available. Request async monitoring so user is notified when data becomes available. */
654     if (bytes_available == 0) {
655         s_read_end_request_async_monitoring(read_end, MONITORING_BECAUSE_WAITING_FOR_DATA);
656         return aws_raise_error(AWS_IO_READ_WOULD_BLOCK);
657     }
658 
659     size_t bytes_to_read = dst_buffer->capacity - dst_buffer->len;
660     if (bytes_to_read > bytes_available) {
661         bytes_to_read = bytes_available;
662     }
663 
664     DWORD bytes_read = 0;
665     bool read_success = ReadFile(
666         read_impl->handle.data.handle,
667         dst_buffer->buffer + dst_buffer->len, /*lpBuffer*/
668         (DWORD)bytes_to_read,                 /*nNumberOfBytesToRead*/
669         &bytes_read,                          /*lpNumberOfBytesRead*/
670         NULL);                                /*lpOverlapped: NULL so read is synchronous*/
671 
672     /* Operation failed. Request async monitoring so user is informed via aws_pipe_on_readable_fn of handle error. */
673     if (!read_success) {
674         s_read_end_request_async_monitoring(read_end, MONITORING_BECAUSE_ERROR_SUSPECTED);
675         return s_raise_last_windows_error();
676     }
677 
678     /* Success */
679     dst_buffer->len += bytes_read;
680 
681     if (amount_read) {
682         *amount_read = bytes_read;
683     }
684 
685     if (bytes_read < bytes_to_read) {
686         /* If we weren't able to read as many bytes as the user requested, that's ok.
687          * Request async monitoring so we can alert the user when more data arrives */
688         s_read_end_request_async_monitoring(read_end, MONITORING_BECAUSE_WAITING_FOR_DATA);
689     }
690 
691     return AWS_OP_SUCCESS;
692 }
693 
aws_pipe_clean_up_write_end(struct aws_pipe_write_end * write_end)694 int aws_pipe_clean_up_write_end(struct aws_pipe_write_end *write_end) {
695 
696     struct write_end_impl *write_impl = write_end->impl_data;
697     if (!write_impl) {
698         return aws_raise_error(AWS_IO_BROKEN_PIPE);
699     }
700 
701     if (!aws_event_loop_thread_is_callers_thread(write_impl->event_loop)) {
702         return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
703     }
704 
705     CloseHandle(write_impl->handle.data.handle);
706 
707     /* Inform outstanding writes about the clean up. */
708     while (!aws_linked_list_empty(&write_impl->write_list)) {
709         struct aws_linked_list_node *node = aws_linked_list_pop_front(&write_impl->write_list);
710         struct write_request *write_req = AWS_CONTAINER_OF(node, struct write_request, list_node);
711         write_req->is_write_end_cleaned_up = true;
712     }
713 
714     aws_mem_release(write_impl->alloc, write_impl);
715     AWS_ZERO_STRUCT(*write_end);
716 
717     return AWS_OP_SUCCESS;
718 }
719 
aws_pipe_write(struct aws_pipe_write_end * write_end,struct aws_byte_cursor src_buffer,aws_pipe_on_write_completed_fn * on_completed,void * user_data)720 int aws_pipe_write(
721     struct aws_pipe_write_end *write_end,
722     struct aws_byte_cursor src_buffer,
723     aws_pipe_on_write_completed_fn *on_completed,
724     void *user_data) {
725 
726     struct write_end_impl *write_impl = write_end->impl_data;
727     if (!write_impl) {
728         return aws_raise_error(AWS_IO_BROKEN_PIPE);
729     }
730 
731     if (!aws_event_loop_thread_is_callers_thread(write_impl->event_loop)) {
732         return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
733     }
734 
735     if (src_buffer.len > MAXDWORD) {
736         return aws_raise_error(AWS_ERROR_INVALID_BUFFER_SIZE);
737     }
738     DWORD num_bytes_to_write = (DWORD)src_buffer.len;
739 
740     struct write_request *write = aws_mem_acquire(write_impl->alloc, sizeof(struct write_request));
741     if (!write) {
742         return AWS_OP_ERR;
743     }
744 
745     AWS_ZERO_STRUCT(*write);
746     write->original_cursor = src_buffer;
747     write->user_callback = on_completed;
748     write->user_data = user_data;
749     write->alloc = write_impl->alloc;
750     aws_overlapped_init(&write->overlapped, s_write_end_on_write_completion, write_end);
751 
752     bool write_success = WriteFile(
753         write_impl->handle.data.handle, /*hFile*/
754         src_buffer.ptr,                 /*lpBuffer*/
755         num_bytes_to_write,             /*nNumberOfBytesToWrite*/
756         NULL,                           /*lpNumberOfBytesWritten*/
757         &write->overlapped.overlapped); /*lpOverlapped*/
758 
759     /* Overlapped WriteFile() calls may succeed immediately, or they may queue the work. In either of these cases, IOCP
760      * on the event-loop will alert us when the operation completes and we'll invoke user callbacks then. */
761     if (!write_success && GetLastError() != ERROR_IO_PENDING) {
762         aws_mem_release(write_impl->alloc, write);
763         return s_raise_last_windows_error();
764     }
765 
766     aws_linked_list_push_back(&write_impl->write_list, &write->list_node);
767     return AWS_OP_SUCCESS;
768 }
769 
s_write_end_on_write_completion(struct aws_event_loop * event_loop,struct aws_overlapped * overlapped,int status_code,size_t num_bytes_transferred)770 void s_write_end_on_write_completion(
771     struct aws_event_loop *event_loop,
772     struct aws_overlapped *overlapped,
773     int status_code,
774     size_t num_bytes_transferred) {
775 
776     (void)event_loop;
777     (void)num_bytes_transferred;
778 
779     struct write_request *write_request = AWS_CONTAINER_OF(overlapped, struct write_request, overlapped);
780     struct aws_pipe_write_end *write_end = write_request->is_write_end_cleaned_up ? NULL : overlapped->user_data;
781 
782     AWS_ASSERT((num_bytes_transferred == write_request->original_cursor.len) || status_code);
783 
784     struct aws_byte_cursor original_cursor = write_request->original_cursor;
785     aws_pipe_on_write_completed_fn *user_callback = write_request->user_callback;
786     void *user_data = write_request->user_data;
787 
788     /* Clean up write-request.
789      * Note that write-end might have been cleaned up before this executes. */
790     if (!write_request->is_write_end_cleaned_up) {
791         aws_linked_list_remove(&write_request->list_node);
792     }
793 
794     aws_mem_release(write_request->alloc, write_request);
795 
796     /* Report outcome to user */
797     if (user_callback) {
798 
799         int error_code = AWS_ERROR_SUCCESS;
800         if (status_code != 0) {
801             error_code = s_translate_windows_error(status_code);
802         }
803 
804         /* Note that user may choose to clean up write-end in this callback */
805         user_callback(write_end, error_code, original_cursor, user_data);
806     }
807 }
808