1 /**
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0.
4 */
5
6 #include <aws/io/pipe.h>
7
8 #include <aws/common/task_scheduler.h>
9 #include <aws/io/event_loop.h>
10
11 #include <stdbool.h>
12 #include <stdio.h>
13
14 enum read_end_state {
15 /* Pipe is open. */
16 READ_END_STATE_OPEN,
17
18 /* Pipe is open, user has subscribed, but async monitoring hasn't started yet.
19 * Pipe moves to SUBCSCRIBED state if async monitoring starts successfully
20 * or SUBSCRIBE_ERROR state if it doesn't start successfully.
21 * From any of the SUBSCRIBE* states, the pipe moves to OPEN state if the user unsubscribes. */
22 READ_END_STATE_SUBSCRIBING,
23
24 /* Pipe is open, user has subscribed, and user is receiving events delivered by async monitoring.
25 * Async monitoring is paused once the file is known to be readable.
26 * Async monitoring is resumed once the user reads all available bytes.
27 * Pipe moves to SUBSCRIBE_ERROR state if async monitoring reports an error, or fails to restart.
28 * Pipe move sto OPEN state if user unsubscribes. */
29 READ_END_STATE_SUBSCRIBED,
30
31 /* Pipe is open, use has subscribed, and an error event has been delivered to the user.
32 * No further error events are delivered to the user, and no more async monitoring occurs.*/
33 READ_END_STATE_SUBSCRIBE_ERROR,
34 };
35
36 /* Reasons to launch async monitoring of the read-end's handle */
37 enum monitoring_reason {
38 MONITORING_BECAUSE_SUBSCRIBING = 1,
39 MONITORING_BECAUSE_WAITING_FOR_DATA = 2,
40 MONITORING_BECAUSE_ERROR_SUSPECTED = 4,
41 };
42
43 /* Async operations live in their own allocations.
44 * This allows the pipe to be cleaned up without waiting for all outstanding operations to complete. */
45 struct async_operation {
46 union {
47 struct aws_overlapped overlapped;
48 struct aws_task task;
49 } op;
50
51 struct aws_allocator *alloc;
52 bool is_active;
53 bool is_read_end_cleaned_up;
54 };
55
56 struct read_end_impl {
57 struct aws_allocator *alloc;
58
59 enum read_end_state state;
60
61 struct aws_io_handle handle;
62
63 struct aws_event_loop *event_loop;
64
65 /* Async overlapped operation for monitoring pipe status.
66 * This operation is re-used each time monitoring resumes.
67 * Note that rapidly subscribing/unsubscribing could lead to the monitoring operation from a previous subscribe
68 * still pending while the user is re-subscribing. */
69 struct async_operation *async_monitoring;
70
71 /* Async task operation used to deliver error reports. */
72 struct async_operation *async_error_report;
73
74 aws_pipe_on_readable_fn *on_readable_user_callback;
75 void *on_readable_user_data;
76
77 /* Error code that the error-reporting task will report. */
78 int error_code_to_report;
79
80 /* Reasons to restart monitoring once current async operation completes.
81 * Contains read_end_monitoring_request_t flags.*/
82 uint8_t monitoring_request_reasons;
83 };
84
85 enum write_end_state {
86 WRITE_END_STATE_CLOSING,
87 WRITE_END_STATE_OPEN,
88 };
89
90 /* Data describing an async write request */
91 struct write_request {
92 struct aws_byte_cursor original_cursor;
93 aws_pipe_on_write_completed_fn *user_callback;
94 void *user_data;
95 struct aws_allocator *alloc;
96 struct aws_overlapped overlapped;
97 struct aws_linked_list_node list_node;
98 bool is_write_end_cleaned_up;
99 };
100
101 struct write_end_impl {
102 struct aws_allocator *alloc;
103 enum write_end_state state;
104 struct aws_io_handle handle;
105 struct aws_event_loop *event_loop;
106
107 /* List of currently active write_requests */
108 struct aws_linked_list write_list;
109
110 /* Future optimization idea: avoid an allocation on each write by keeping 1 pre-allocated write_request around
111 * and re-using it whenever possible */
112 };
113
114 enum {
115 PIPE_BUFFER_SIZE = 4096,
116 PIPE_UNIQUE_NAME_MAX_TRIES = 10,
117 };
118
119 static void s_read_end_on_zero_byte_read_completion(
120 struct aws_event_loop *event_loop,
121 struct aws_overlapped *overlapped,
122 int status_code,
123 size_t num_bytes_transferred);
124 static void s_read_end_report_error_task(struct aws_task *task, void *user_data, enum aws_task_status status);
125 static void s_write_end_on_write_completion(
126 struct aws_event_loop *event_loop,
127 struct aws_overlapped *overlapped,
128 int status_code,
129 size_t num_bytes_transferred);
130
131 /* Translate Windows errors into aws_pipe errors */
s_translate_windows_error(DWORD win_error)132 static int s_translate_windows_error(DWORD win_error) {
133 switch (win_error) {
134 case ERROR_BROKEN_PIPE:
135 return AWS_IO_BROKEN_PIPE;
136 case 0xC000014B: /* STATUS_PIPE_BROKEN */
137 return AWS_IO_BROKEN_PIPE;
138 case 0xC0000120: /* STATUS_CANCELLED */
139 return AWS_IO_BROKEN_PIPE;
140 default:
141 return AWS_ERROR_SYS_CALL_FAILURE;
142 }
143 }
144
s_raise_last_windows_error(void)145 static int s_raise_last_windows_error(void) {
146 DWORD win_error = GetLastError();
147 int aws_error = s_translate_windows_error(win_error);
148 return aws_raise_error(aws_error);
149 }
150
151 AWS_THREAD_LOCAL uint32_t tl_unique_name_counter = 0;
152
aws_pipe_get_unique_name(char * dst,size_t dst_size)153 AWS_IO_API int aws_pipe_get_unique_name(char *dst, size_t dst_size) {
154 /* For local pipes, name should be unique per-machine.
155 * Mix together several sources that should should lead to something unique. */
156
157 DWORD process_id = GetCurrentProcessId();
158
159 DWORD thread_id = GetCurrentThreadId();
160
161 uint32_t counter = tl_unique_name_counter++;
162
163 LARGE_INTEGER timestamp;
164 bool success = QueryPerformanceCounter(×tamp);
165 AWS_ASSERT(success);
166 (void)success; /* QueryPerformanceCounter() always succeeds on XP and later */
167
168 /* snprintf() returns number of characters (not including '\0') which would have written if dst_size was ignored */
169 int ideal_strlen = snprintf(
170 dst,
171 dst_size,
172 "\\\\.\\pipe\\aws_pipe_%08x_%08x_%08x_%08x%08x",
173 process_id,
174 thread_id,
175 counter,
176 timestamp.HighPart,
177 timestamp.LowPart);
178
179 AWS_ASSERT(ideal_strlen > 0);
180 if (dst_size < (size_t)(ideal_strlen + 1)) {
181 return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
182 }
183
184 return AWS_OP_SUCCESS;
185 }
186
aws_pipe_init(struct aws_pipe_read_end * read_end,struct aws_event_loop * read_end_event_loop,struct aws_pipe_write_end * write_end,struct aws_event_loop * write_end_event_loop,struct aws_allocator * allocator)187 int aws_pipe_init(
188 struct aws_pipe_read_end *read_end,
189 struct aws_event_loop *read_end_event_loop,
190 struct aws_pipe_write_end *write_end,
191 struct aws_event_loop *write_end_event_loop,
192 struct aws_allocator *allocator) {
193
194 AWS_ASSERT(read_end);
195 AWS_ASSERT(read_end_event_loop);
196 AWS_ASSERT(write_end);
197 AWS_ASSERT(write_end_event_loop);
198 AWS_ASSERT(allocator);
199
200 AWS_ZERO_STRUCT(*write_end);
201 AWS_ZERO_STRUCT(*read_end);
202
203 struct write_end_impl *write_impl = NULL;
204 struct read_end_impl *read_impl = NULL;
205
206 /* Init write-end */
207 write_impl = aws_mem_calloc(allocator, 1, sizeof(struct write_end_impl));
208 if (!write_impl) {
209 goto clean_up;
210 }
211
212 write_impl->alloc = allocator;
213 write_impl->state = WRITE_END_STATE_OPEN;
214 write_impl->handle.data.handle = INVALID_HANDLE_VALUE;
215 aws_linked_list_init(&write_impl->write_list);
216
217 /* Anonymous pipes don't support overlapped I/O so named pipes are used. Names must be unique system-wide.
218 * We generate random names, but collisions are theoretically possible, so try several times before giving up. */
219 char pipe_name[256];
220 int tries = 0;
221 while (true) {
222 int err = aws_pipe_get_unique_name(pipe_name, sizeof(pipe_name));
223 if (err) {
224 goto clean_up;
225 }
226
227 const DWORD open_mode = PIPE_ACCESS_OUTBOUND | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE;
228
229 const DWORD pipe_mode = PIPE_TYPE_BYTE | PIPE_WAIT | PIPE_REJECT_REMOTE_CLIENTS;
230
231 write_impl->handle.data.handle = CreateNamedPipeA(
232 pipe_name,
233 open_mode,
234 pipe_mode,
235 1, /*nMaxInstances*/
236 PIPE_BUFFER_SIZE, /*nOutBufferSize*/
237 PIPE_BUFFER_SIZE, /*nInBufferSize*/
238 0, /*nDefaultTimeout: 0 means default*/
239 NULL); /*lpSecurityAttributes: NULL means default */
240
241 if (write_impl->handle.data.handle != INVALID_HANDLE_VALUE) {
242 /* Success, break out of loop */
243 break;
244 }
245
246 if (++tries >= PIPE_UNIQUE_NAME_MAX_TRIES) {
247 s_raise_last_windows_error();
248 goto clean_up;
249 }
250 }
251
252 int err = aws_event_loop_connect_handle_to_io_completion_port(write_end_event_loop, &write_impl->handle);
253 if (err) {
254 goto clean_up;
255 }
256
257 write_impl->event_loop = write_end_event_loop;
258
259 /* Init read-end */
260 read_impl = aws_mem_calloc(allocator, 1, sizeof(struct read_end_impl));
261 if (!read_impl) {
262 goto clean_up;
263 }
264
265 read_impl->alloc = allocator;
266 read_impl->state = READ_END_STATE_OPEN;
267 read_impl->handle.data.handle = INVALID_HANDLE_VALUE;
268
269 read_impl->handle.data.handle = CreateFileA(
270 pipe_name, /*lpFileName*/
271 GENERIC_READ, /*dwDesiredAccess*/
272 0, /*dwShareMode: 0 prevents acess by external processes*/
273 NULL, /*lpSecurityAttributes: NULL prevents inheritance by child processes*/
274 OPEN_EXISTING, /*dwCreationDisposition*/
275 FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, /*dwFlagsAndAttributes*/
276 NULL); /*hTemplateFile: ignored when opening existing file*/
277
278 if (read_impl->handle.data.handle == INVALID_HANDLE_VALUE) {
279 s_raise_last_windows_error();
280 goto clean_up;
281 }
282
283 err = aws_event_loop_connect_handle_to_io_completion_port(read_end_event_loop, &read_impl->handle);
284 if (err) {
285 goto clean_up;
286 }
287
288 read_impl->event_loop = read_end_event_loop;
289
290 /* Init the read-end's async operations */
291 read_impl->async_monitoring = aws_mem_calloc(allocator, 1, sizeof(struct async_operation));
292 if (!read_impl->async_monitoring) {
293 goto clean_up;
294 }
295
296 read_impl->async_monitoring->alloc = allocator;
297 aws_overlapped_init(&read_impl->async_monitoring->op.overlapped, s_read_end_on_zero_byte_read_completion, read_end);
298
299 read_impl->async_error_report = aws_mem_calloc(allocator, 1, sizeof(struct async_operation));
300 if (!read_impl->async_error_report) {
301 goto clean_up;
302 }
303
304 read_impl->async_error_report->alloc = allocator;
305 aws_task_init(
306 &read_impl->async_error_report->op.task, s_read_end_report_error_task, read_end, "pipe_read_end_report_error");
307
308 /* Success */
309 write_end->impl_data = write_impl;
310 read_end->impl_data = read_impl;
311 return AWS_OP_SUCCESS;
312
313 clean_up:
314 if (write_impl) {
315 if (write_impl->handle.data.handle != INVALID_HANDLE_VALUE) {
316 CloseHandle(write_impl->handle.data.handle);
317 }
318
319 aws_mem_release(allocator, write_impl);
320 write_impl = NULL;
321 }
322
323 if (read_impl) {
324 if (read_impl->handle.data.handle != INVALID_HANDLE_VALUE) {
325 CloseHandle(read_impl->handle.data.handle);
326 }
327
328 if (read_impl->async_monitoring) {
329 aws_mem_release(allocator, read_impl->async_monitoring);
330 }
331
332 if (read_impl->async_error_report) {
333 aws_mem_release(allocator, read_impl->async_error_report);
334 }
335
336 aws_mem_release(allocator, read_impl);
337 read_impl = NULL;
338 }
339
340 return AWS_OP_ERR;
341 }
342
aws_pipe_get_read_end_event_loop(const struct aws_pipe_read_end * read_end)343 struct aws_event_loop *aws_pipe_get_read_end_event_loop(const struct aws_pipe_read_end *read_end) {
344 struct read_end_impl *read_impl = read_end->impl_data;
345 if (!read_impl) {
346 aws_raise_error(AWS_IO_BROKEN_PIPE);
347 return NULL;
348 }
349
350 return read_impl->event_loop;
351 }
352
aws_pipe_get_write_end_event_loop(const struct aws_pipe_write_end * write_end)353 struct aws_event_loop *aws_pipe_get_write_end_event_loop(const struct aws_pipe_write_end *write_end) {
354 struct write_end_impl *write_impl = write_end->impl_data;
355 if (!write_impl) {
356 aws_raise_error(AWS_IO_BROKEN_PIPE);
357 return NULL;
358 }
359
360 return write_impl->event_loop;
361 }
362
aws_pipe_clean_up_read_end(struct aws_pipe_read_end * read_end)363 int aws_pipe_clean_up_read_end(struct aws_pipe_read_end *read_end) {
364
365 struct read_end_impl *read_impl = read_end->impl_data;
366 if (!read_impl) {
367 return aws_raise_error(AWS_IO_BROKEN_PIPE);
368 }
369
370 if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) {
371 return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
372 }
373
374 CloseHandle(read_impl->handle.data.handle);
375
376 /* If the async operations are inactive they can be deleted now.
377 * Otherwise, inform the operations of the clean-up so they can delete themselves upon completion. */
378 if (!read_impl->async_monitoring->is_active) {
379 aws_mem_release(read_impl->alloc, read_impl->async_monitoring);
380 } else {
381 read_impl->async_monitoring->is_read_end_cleaned_up = true;
382 }
383
384 if (!read_impl->async_error_report->is_active) {
385 aws_mem_release(read_impl->alloc, read_impl->async_error_report);
386 } else {
387 read_impl->async_error_report->is_read_end_cleaned_up = true;
388 }
389
390 aws_mem_release(read_impl->alloc, read_impl);
391 AWS_ZERO_STRUCT(*read_end);
392
393 return AWS_OP_SUCCESS;
394 }
395
396 /* Return whether a user is subscribed to receive read events */
s_read_end_is_subscribed(struct aws_pipe_read_end * read_end)397 static bool s_read_end_is_subscribed(struct aws_pipe_read_end *read_end) {
398 struct read_end_impl *read_impl = read_end->impl_data;
399 switch (read_impl->state) {
400 case READ_END_STATE_SUBSCRIBING:
401 case READ_END_STATE_SUBSCRIBED:
402 case READ_END_STATE_SUBSCRIBE_ERROR:
403 return true;
404 default:
405 return false;
406 }
407 }
408
409 /* Detect events on the pipe by kicking off an async zero-byte-read.
410 * When the pipe becomes readable or an error occurs, the read will
411 * complete and we will report the event. */
s_read_end_request_async_monitoring(struct aws_pipe_read_end * read_end,int request_reason)412 static void s_read_end_request_async_monitoring(struct aws_pipe_read_end *read_end, int request_reason) {
413 struct read_end_impl *read_impl = read_end->impl_data;
414 AWS_ASSERT(read_impl);
415
416 /* We only do async monitoring while user is subscribed, but not if we've
417 * reported an error and moved into the SUBSCRIBE_ERROR state */
418 bool async_monitoring_allowed =
419 s_read_end_is_subscribed(read_end) && (read_impl->state != READ_END_STATE_SUBSCRIBE_ERROR);
420 if (!async_monitoring_allowed) {
421 return;
422 }
423
424 /* We can only have one monitoring operation active at a time. Save off
425 * the reason for the request. When the current operation completes,
426 * if this reason is still valid, we'll re-launch async monitoring */
427 if (read_impl->async_monitoring->is_active) {
428 read_impl->monitoring_request_reasons |= request_reason;
429 return;
430 }
431
432 AWS_ASSERT(read_impl->error_code_to_report == 0);
433
434 read_impl->monitoring_request_reasons = 0;
435 read_impl->state = READ_END_STATE_SUBSCRIBED;
436
437 /* aws_overlapped must be reset before each use */
438 aws_overlapped_reset(&read_impl->async_monitoring->op.overlapped);
439
440 int fake_buffer;
441 bool success = ReadFile(
442 read_impl->handle.data.handle,
443 &fake_buffer,
444 0, /*nNumberOfBytesToRead*/
445 NULL, /*lpNumberOfBytesRead: NULL for an overlapped operation*/
446 &read_impl->async_monitoring->op.overlapped.overlapped);
447
448 if (success || (GetLastError() == ERROR_IO_PENDING)) {
449 /* Success launching zero-byte-read, aka async monitoring operation */
450 read_impl->async_monitoring->is_active = true;
451 return;
452 }
453
454 /* User is subscribed for IO events and expects to be notified of errors via the event callback.
455 * We schedule this as a task so the callback doesn't happen before the user expects it.
456 * We also set the state to SUBSCRIBE_ERROR so we don't keep trying to monitor the file. */
457 read_impl->state = READ_END_STATE_SUBSCRIBE_ERROR;
458 read_impl->error_code_to_report = s_translate_windows_error(GetLastError());
459 read_impl->async_error_report->is_active = true;
460 aws_event_loop_schedule_task_now(read_impl->event_loop, &read_impl->async_error_report->op.task);
461 }
462
s_read_end_report_error_task(struct aws_task * task,void * user_data,enum aws_task_status status)463 static void s_read_end_report_error_task(struct aws_task *task, void *user_data, enum aws_task_status status) {
464 (void)status; /* Do same work whether or not this is a "cancelled" task */
465
466 struct async_operation *async_op = AWS_CONTAINER_OF(task, struct async_operation, op);
467 AWS_ASSERT(async_op->is_active);
468 async_op->is_active = false;
469
470 /* If the read end has been cleaned up, don't report the error, just free the task's memory. */
471 if (async_op->is_read_end_cleaned_up) {
472 aws_mem_release(async_op->alloc, async_op);
473 return;
474 }
475
476 struct aws_pipe_read_end *read_end = user_data;
477 struct read_end_impl *read_impl = read_end->impl_data;
478 AWS_ASSERT(read_impl);
479
480 /* Only report the error if we're still in the SUBSCRIBE_ERROR state.
481 * If the user unsubscribed since this task was queued, then we'd be in a different state. */
482 if (read_impl->state == READ_END_STATE_SUBSCRIBE_ERROR) {
483 AWS_ASSERT(read_impl->error_code_to_report != 0);
484
485 if (read_impl->on_readable_user_callback) {
486 read_impl->on_readable_user_callback(
487 read_end, read_impl->error_code_to_report, read_impl->on_readable_user_data);
488 }
489 }
490 }
491
s_read_end_on_zero_byte_read_completion(struct aws_event_loop * event_loop,struct aws_overlapped * overlapped,int status_code,size_t num_bytes_transferred)492 static void s_read_end_on_zero_byte_read_completion(
493 struct aws_event_loop *event_loop,
494 struct aws_overlapped *overlapped,
495 int status_code,
496 size_t num_bytes_transferred) {
497
498 (void)event_loop;
499 (void)num_bytes_transferred;
500
501 struct async_operation *async_op = AWS_CONTAINER_OF(overlapped, struct async_operation, op);
502
503 /* If the read-end has been cleaned up, simply free the operation's memory and return. */
504 if (async_op->is_read_end_cleaned_up) {
505 aws_mem_release(async_op->alloc, async_op);
506 return;
507 }
508
509 struct aws_pipe_read_end *read_end = overlapped->user_data;
510 struct read_end_impl *read_impl = read_end->impl_data;
511 AWS_ASSERT(read_impl);
512
513 /* Only report events to user when in the SUBSCRIBED state.
514 * If in the SUBSCRIBING state, this completion is from an operation begun during a previous subscription. */
515 if (read_impl->state == READ_END_STATE_SUBSCRIBED) {
516 int readable_error_code;
517
518 if (status_code == 0) {
519 readable_error_code = AWS_ERROR_SUCCESS;
520
521 /* Clear out the "waiting for data" reason to restart zero-byte-read, since we're about to tell the user
522 * that the pipe is readable. If the user consumes all the data, the "waiting for data" reason will get set
523 * again and async-monitoring will be relaunched at the end of this function. */
524 read_impl->monitoring_request_reasons &= ~MONITORING_BECAUSE_WAITING_FOR_DATA;
525
526 } else {
527 readable_error_code = AWS_IO_BROKEN_PIPE;
528
529 /* Move pipe to SUBSCRIBE_ERROR state to prevent further monitoring */
530 read_impl->state = READ_END_STATE_SUBSCRIBE_ERROR;
531 }
532
533 if (read_impl->on_readable_user_callback) {
534 read_impl->on_readable_user_callback(read_end, readable_error_code, read_impl->on_readable_user_data);
535 }
536 }
537
538 /* Note that the user callback might have invoked aws_pipe_clean_up_read_end().
539 * If so, clean up the operation's memory.
540 * Otherwise, relaunch the monitoring operation if there's a reason to do so */
541 AWS_ASSERT(async_op->is_active);
542 async_op->is_active = false;
543
544 if (async_op->is_read_end_cleaned_up) {
545 aws_mem_release(async_op->alloc, async_op);
546 } else if (read_impl->monitoring_request_reasons != 0) {
547 s_read_end_request_async_monitoring(read_end, read_impl->monitoring_request_reasons);
548 }
549 }
550
aws_pipe_subscribe_to_readable_events(struct aws_pipe_read_end * read_end,aws_pipe_on_readable_fn * on_readable,void * user_data)551 int aws_pipe_subscribe_to_readable_events(
552 struct aws_pipe_read_end *read_end,
553 aws_pipe_on_readable_fn *on_readable,
554 void *user_data) {
555
556 struct read_end_impl *read_impl = read_end->impl_data;
557 if (!read_impl) {
558 return aws_raise_error(AWS_IO_BROKEN_PIPE);
559 }
560
561 if (read_impl->state != READ_END_STATE_OPEN) {
562 /* Return specific error about why user can't subscribe */
563 if (s_read_end_is_subscribed(read_end)) {
564 return aws_raise_error(AWS_ERROR_IO_ALREADY_SUBSCRIBED);
565 }
566
567 AWS_ASSERT(0); /* Unexpected state */
568 return aws_raise_error(AWS_ERROR_UNKNOWN);
569 }
570
571 if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) {
572 return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
573 }
574
575 read_impl->state = READ_END_STATE_SUBSCRIBING;
576 read_impl->on_readable_user_callback = on_readable;
577 read_impl->on_readable_user_data = user_data;
578
579 s_read_end_request_async_monitoring(read_end, MONITORING_BECAUSE_SUBSCRIBING);
580
581 return AWS_OP_SUCCESS;
582 }
583
aws_pipe_unsubscribe_from_readable_events(struct aws_pipe_read_end * read_end)584 int aws_pipe_unsubscribe_from_readable_events(struct aws_pipe_read_end *read_end) {
585 struct read_end_impl *read_impl = read_end->impl_data;
586 if (!read_impl) {
587 return aws_raise_error(AWS_IO_BROKEN_PIPE);
588 }
589
590 if (!s_read_end_is_subscribed(read_end)) {
591 return aws_raise_error(AWS_ERROR_IO_NOT_SUBSCRIBED);
592 }
593
594 if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) {
595 return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
596 }
597
598 read_impl->state = READ_END_STATE_OPEN;
599 read_impl->on_readable_user_callback = NULL;
600 read_impl->on_readable_user_data = NULL;
601 read_impl->monitoring_request_reasons = 0;
602 read_impl->error_code_to_report = 0;
603
604 /* If there's a chance the zero-byte-read is pending, cancel it.
605 * s_read_end_on_zero_byte_read_completion() will see status code
606 * ERROR_OPERATION_ABORTED, but won't pass the event to the user
607 * because we're not in the SUBSCRIBED state anymore. */
608 if (read_impl->async_monitoring->is_active) {
609 CancelIo(read_impl->handle.data.handle);
610 }
611
612 return AWS_OP_SUCCESS;
613 }
614
aws_pipe_read(struct aws_pipe_read_end * read_end,struct aws_byte_buf * dst_buffer,size_t * amount_read)615 int aws_pipe_read(struct aws_pipe_read_end *read_end, struct aws_byte_buf *dst_buffer, size_t *amount_read) {
616 AWS_ASSERT(dst_buffer && dst_buffer->buffer);
617
618 struct read_end_impl *read_impl = read_end->impl_data;
619 if (!read_impl) {
620 return aws_raise_error(AWS_IO_BROKEN_PIPE);
621 }
622
623 if (amount_read) {
624 *amount_read = 0;
625 }
626
627 if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) {
628 return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
629 }
630
631 /* Just return success if user requests 0 data */
632 if (dst_buffer->capacity <= dst_buffer->len) {
633 return AWS_OP_SUCCESS;
634 }
635
636 /* ReadFile() will be called in synchronous mode and would block indefinitely if it asked for more bytes than are
637 * currently available. Therefore, peek at the available bytes before performing the actual read. */
638 DWORD bytes_available = 0;
639 bool peek_success = PeekNamedPipe(
640 read_impl->handle.data.handle,
641 NULL, /*lpBuffer: NULL so peek doesn't actually copy data */
642 0, /*nBufferSize*/
643 NULL, /*lpBytesRead*/
644 &bytes_available, /*lpTotalBytesAvail*/
645 NULL); /*lpBytesLeftThisMessage: doesn't apply to byte-type pipes*/
646
647 /* If operation failed. Request async monitoring so user is informed via aws_pipe_on_readable_fn of handle error. */
648 if (!peek_success) {
649 s_read_end_request_async_monitoring(read_end, MONITORING_BECAUSE_ERROR_SUSPECTED);
650 return s_raise_last_windows_error();
651 }
652
653 /* If no data available. Request async monitoring so user is notified when data becomes available. */
654 if (bytes_available == 0) {
655 s_read_end_request_async_monitoring(read_end, MONITORING_BECAUSE_WAITING_FOR_DATA);
656 return aws_raise_error(AWS_IO_READ_WOULD_BLOCK);
657 }
658
659 size_t bytes_to_read = dst_buffer->capacity - dst_buffer->len;
660 if (bytes_to_read > bytes_available) {
661 bytes_to_read = bytes_available;
662 }
663
664 DWORD bytes_read = 0;
665 bool read_success = ReadFile(
666 read_impl->handle.data.handle,
667 dst_buffer->buffer + dst_buffer->len, /*lpBuffer*/
668 (DWORD)bytes_to_read, /*nNumberOfBytesToRead*/
669 &bytes_read, /*lpNumberOfBytesRead*/
670 NULL); /*lpOverlapped: NULL so read is synchronous*/
671
672 /* Operation failed. Request async monitoring so user is informed via aws_pipe_on_readable_fn of handle error. */
673 if (!read_success) {
674 s_read_end_request_async_monitoring(read_end, MONITORING_BECAUSE_ERROR_SUSPECTED);
675 return s_raise_last_windows_error();
676 }
677
678 /* Success */
679 dst_buffer->len += bytes_read;
680
681 if (amount_read) {
682 *amount_read = bytes_read;
683 }
684
685 if (bytes_read < bytes_to_read) {
686 /* If we weren't able to read as many bytes as the user requested, that's ok.
687 * Request async monitoring so we can alert the user when more data arrives */
688 s_read_end_request_async_monitoring(read_end, MONITORING_BECAUSE_WAITING_FOR_DATA);
689 }
690
691 return AWS_OP_SUCCESS;
692 }
693
aws_pipe_clean_up_write_end(struct aws_pipe_write_end * write_end)694 int aws_pipe_clean_up_write_end(struct aws_pipe_write_end *write_end) {
695
696 struct write_end_impl *write_impl = write_end->impl_data;
697 if (!write_impl) {
698 return aws_raise_error(AWS_IO_BROKEN_PIPE);
699 }
700
701 if (!aws_event_loop_thread_is_callers_thread(write_impl->event_loop)) {
702 return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
703 }
704
705 CloseHandle(write_impl->handle.data.handle);
706
707 /* Inform outstanding writes about the clean up. */
708 while (!aws_linked_list_empty(&write_impl->write_list)) {
709 struct aws_linked_list_node *node = aws_linked_list_pop_front(&write_impl->write_list);
710 struct write_request *write_req = AWS_CONTAINER_OF(node, struct write_request, list_node);
711 write_req->is_write_end_cleaned_up = true;
712 }
713
714 aws_mem_release(write_impl->alloc, write_impl);
715 AWS_ZERO_STRUCT(*write_end);
716
717 return AWS_OP_SUCCESS;
718 }
719
aws_pipe_write(struct aws_pipe_write_end * write_end,struct aws_byte_cursor src_buffer,aws_pipe_on_write_completed_fn * on_completed,void * user_data)720 int aws_pipe_write(
721 struct aws_pipe_write_end *write_end,
722 struct aws_byte_cursor src_buffer,
723 aws_pipe_on_write_completed_fn *on_completed,
724 void *user_data) {
725
726 struct write_end_impl *write_impl = write_end->impl_data;
727 if (!write_impl) {
728 return aws_raise_error(AWS_IO_BROKEN_PIPE);
729 }
730
731 if (!aws_event_loop_thread_is_callers_thread(write_impl->event_loop)) {
732 return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
733 }
734
735 if (src_buffer.len > MAXDWORD) {
736 return aws_raise_error(AWS_ERROR_INVALID_BUFFER_SIZE);
737 }
738 DWORD num_bytes_to_write = (DWORD)src_buffer.len;
739
740 struct write_request *write = aws_mem_acquire(write_impl->alloc, sizeof(struct write_request));
741 if (!write) {
742 return AWS_OP_ERR;
743 }
744
745 AWS_ZERO_STRUCT(*write);
746 write->original_cursor = src_buffer;
747 write->user_callback = on_completed;
748 write->user_data = user_data;
749 write->alloc = write_impl->alloc;
750 aws_overlapped_init(&write->overlapped, s_write_end_on_write_completion, write_end);
751
752 bool write_success = WriteFile(
753 write_impl->handle.data.handle, /*hFile*/
754 src_buffer.ptr, /*lpBuffer*/
755 num_bytes_to_write, /*nNumberOfBytesToWrite*/
756 NULL, /*lpNumberOfBytesWritten*/
757 &write->overlapped.overlapped); /*lpOverlapped*/
758
759 /* Overlapped WriteFile() calls may succeed immediately, or they may queue the work. In either of these cases, IOCP
760 * on the event-loop will alert us when the operation completes and we'll invoke user callbacks then. */
761 if (!write_success && GetLastError() != ERROR_IO_PENDING) {
762 aws_mem_release(write_impl->alloc, write);
763 return s_raise_last_windows_error();
764 }
765
766 aws_linked_list_push_back(&write_impl->write_list, &write->list_node);
767 return AWS_OP_SUCCESS;
768 }
769
s_write_end_on_write_completion(struct aws_event_loop * event_loop,struct aws_overlapped * overlapped,int status_code,size_t num_bytes_transferred)770 void s_write_end_on_write_completion(
771 struct aws_event_loop *event_loop,
772 struct aws_overlapped *overlapped,
773 int status_code,
774 size_t num_bytes_transferred) {
775
776 (void)event_loop;
777 (void)num_bytes_transferred;
778
779 struct write_request *write_request = AWS_CONTAINER_OF(overlapped, struct write_request, overlapped);
780 struct aws_pipe_write_end *write_end = write_request->is_write_end_cleaned_up ? NULL : overlapped->user_data;
781
782 AWS_ASSERT((num_bytes_transferred == write_request->original_cursor.len) || status_code);
783
784 struct aws_byte_cursor original_cursor = write_request->original_cursor;
785 aws_pipe_on_write_completed_fn *user_callback = write_request->user_callback;
786 void *user_data = write_request->user_data;
787
788 /* Clean up write-request.
789 * Note that write-end might have been cleaned up before this executes. */
790 if (!write_request->is_write_end_cleaned_up) {
791 aws_linked_list_remove(&write_request->list_node);
792 }
793
794 aws_mem_release(write_request->alloc, write_request);
795
796 /* Report outcome to user */
797 if (user_callback) {
798
799 int error_code = AWS_ERROR_SUCCESS;
800 if (status_code != 0) {
801 error_code = s_translate_windows_error(status_code);
802 }
803
804 /* Note that user may choose to clean up write-end in this callback */
805 user_callback(write_end, error_code, original_cursor, user_data);
806 }
807 }
808