1 /**
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  * SPDX-License-Identifier: Apache-2.0.
4  */
5 
6 #include <aws/io/socket.h>
7 
8 #include <aws/common/clock.h>
9 #include <aws/common/condition_variable.h>
10 #include <aws/common/mutex.h>
11 #include <aws/common/string.h>
12 
13 #include <aws/io/event_loop.h>
14 #include <aws/io/logging.h>
15 
16 #include <arpa/inet.h>
17 #include <aws/io/io.h>
18 #include <errno.h>
19 #include <fcntl.h>
20 #include <netinet/tcp.h>
21 #include <sys/socket.h>
22 #include <sys/types.h>
23 #include <unistd.h>
24 
25 /*
26  * On OsX, suppress NoPipe signals via flags to setsockopt()
27  * On Linux, suppress NoPipe signals via flags to send()
28  */
29 #if defined(__MACH__)
30 #    define NO_SIGNAL_SOCK_OPT SO_NOSIGPIPE
31 #    define NO_SIGNAL_SEND 0
32 #    define TCP_KEEPIDLE TCP_KEEPALIVE
33 #else
34 #    define NO_SIGNAL_SEND MSG_NOSIGNAL
35 #endif
36 
37 /* This isn't defined on ancient linux distros (breaking the builds).
38  * However, if this is a prebuild, we purposely build on an ancient system, but
39  * we want the kernel calls to still be the same as a modern build since that's likely the target of the application
40  * calling this code. Just define this if it isn't there already. GlibC and the kernel don't really care how the flag
41  * gets passed as long as it does.
42  */
43 #ifndef O_CLOEXEC
44 #    define O_CLOEXEC 02000000
45 #endif
46 
47 #ifdef USE_VSOCK
48 #    if defined(__linux__) && defined(AF_VSOCK)
49 #        include <linux/vm_sockets.h>
50 #    else
51 #        error "USE_VSOCK not supported on current platform"
52 #    endif
53 #endif
54 
55 /* other than CONNECTED_READ | CONNECTED_WRITE
56  * a socket is only in one of these states at a time. */
57 enum socket_state {
58     INIT = 0x01,
59     CONNECTING = 0x02,
60     CONNECTED_READ = 0x04,
61     CONNECTED_WRITE = 0x08,
62     BOUND = 0x10,
63     LISTENING = 0x20,
64     TIMEDOUT = 0x40,
65     ERROR = 0x80,
66     CLOSED,
67 };
68 
s_convert_domain(enum aws_socket_domain domain)69 static int s_convert_domain(enum aws_socket_domain domain) {
70     switch (domain) {
71         case AWS_SOCKET_IPV4:
72             return AF_INET;
73         case AWS_SOCKET_IPV6:
74             return AF_INET6;
75         case AWS_SOCKET_LOCAL:
76             return AF_UNIX;
77 #ifdef USE_VSOCK
78         case AWS_SOCKET_VSOCK:
79             return AF_VSOCK;
80 #endif
81         default:
82             AWS_ASSERT(0);
83             return AF_INET;
84     }
85 }
86 
s_convert_type(enum aws_socket_type type)87 static int s_convert_type(enum aws_socket_type type) {
88     switch (type) {
89         case AWS_SOCKET_STREAM:
90             return SOCK_STREAM;
91         case AWS_SOCKET_DGRAM:
92             return SOCK_DGRAM;
93         default:
94             AWS_ASSERT(0);
95             return SOCK_STREAM;
96     }
97 }
98 
s_determine_socket_error(int error)99 static int s_determine_socket_error(int error) {
100     switch (error) {
101         case ECONNREFUSED:
102             return AWS_IO_SOCKET_CONNECTION_REFUSED;
103         case ETIMEDOUT:
104             return AWS_IO_SOCKET_TIMEOUT;
105         case EHOSTUNREACH:
106         case ENETUNREACH:
107             return AWS_IO_SOCKET_NO_ROUTE_TO_HOST;
108         case EADDRNOTAVAIL:
109             return AWS_IO_SOCKET_INVALID_ADDRESS;
110         case ENETDOWN:
111             return AWS_IO_SOCKET_NETWORK_DOWN;
112         case ECONNABORTED:
113             return AWS_IO_SOCKET_CONNECT_ABORTED;
114         case EADDRINUSE:
115             return AWS_IO_SOCKET_ADDRESS_IN_USE;
116         case ENOBUFS:
117         case ENOMEM:
118             return AWS_ERROR_OOM;
119         case EAGAIN:
120             return AWS_IO_READ_WOULD_BLOCK;
121         case EMFILE:
122         case ENFILE:
123             return AWS_ERROR_MAX_FDS_EXCEEDED;
124         case ENOENT:
125         case EINVAL:
126             return AWS_ERROR_FILE_INVALID_PATH;
127         case EAFNOSUPPORT:
128             return AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY;
129         case EACCES:
130             return AWS_ERROR_NO_PERMISSION;
131         default:
132             return AWS_IO_SOCKET_NOT_CONNECTED;
133     }
134 }
135 
s_create_socket(struct aws_socket * sock,const struct aws_socket_options * options)136 static int s_create_socket(struct aws_socket *sock, const struct aws_socket_options *options) {
137 
138     int fd = socket(s_convert_domain(options->domain), s_convert_type(options->type), 0);
139     AWS_LOGF_DEBUG(
140         AWS_LS_IO_SOCKET,
141         "id=%p fd=%d: initializing with domain %d and type %d",
142         (void *)sock,
143         fd,
144         options->domain,
145         options->type);
146     if (fd != -1) {
147         int flags = fcntl(fd, F_GETFL, 0);
148         flags |= O_NONBLOCK | O_CLOEXEC;
149         int success = fcntl(fd, F_SETFL, flags);
150         (void)success;
151         sock->io_handle.data.fd = fd;
152         sock->io_handle.additional_data = NULL;
153         return aws_socket_set_options(sock, options);
154     }
155 
156     int aws_error = s_determine_socket_error(errno);
157     return aws_raise_error(aws_error);
158 }
159 
160 struct posix_socket_connect_args {
161     struct aws_task task;
162     struct aws_allocator *allocator;
163     struct aws_socket *socket;
164 };
165 
166 struct posix_socket {
167     struct aws_linked_list write_queue;
168     struct aws_linked_list written_queue;
169     struct aws_task written_task;
170     struct posix_socket_connect_args *connect_args;
171     /* Note that only the posix_socket impl part is refcounted.
172      * The public aws_socket can be a stack variable and cleaned up synchronously
173      * (by blocking until the event-loop cleans up the impl part).
174      * In hindsight, aws_socket should have been heap-allocated and refcounted, but alas */
175     struct aws_ref_count internal_refcount;
176     struct aws_allocator *allocator;
177     bool written_task_scheduled;
178     bool currently_subscribed;
179     bool continue_accept;
180     bool *close_happened;
181 };
182 
s_socket_destroy_impl(void * user_data)183 static void s_socket_destroy_impl(void *user_data) {
184     struct posix_socket *socket_impl = user_data;
185     aws_mem_release(socket_impl->allocator, socket_impl);
186 }
187 
s_socket_init(struct aws_socket * socket,struct aws_allocator * alloc,const struct aws_socket_options * options,int existing_socket_fd)188 static int s_socket_init(
189     struct aws_socket *socket,
190     struct aws_allocator *alloc,
191     const struct aws_socket_options *options,
192     int existing_socket_fd) {
193     AWS_ASSERT(options);
194     AWS_ZERO_STRUCT(*socket);
195 
196     struct posix_socket *posix_socket = aws_mem_calloc(alloc, 1, sizeof(struct posix_socket));
197     if (!posix_socket) {
198         socket->impl = NULL;
199         return AWS_OP_ERR;
200     }
201 
202     socket->allocator = alloc;
203     socket->io_handle.data.fd = -1;
204     socket->state = INIT;
205     socket->options = *options;
206 
207     if (existing_socket_fd < 0) {
208         int err = s_create_socket(socket, options);
209         if (err) {
210             aws_mem_release(alloc, posix_socket);
211             socket->impl = NULL;
212             return AWS_OP_ERR;
213         }
214     } else {
215         socket->io_handle = (struct aws_io_handle){
216             .data = {.fd = existing_socket_fd},
217             .additional_data = NULL,
218         };
219         aws_socket_set_options(socket, options);
220     }
221 
222     aws_linked_list_init(&posix_socket->write_queue);
223     aws_linked_list_init(&posix_socket->written_queue);
224     posix_socket->currently_subscribed = false;
225     posix_socket->continue_accept = false;
226     aws_ref_count_init(&posix_socket->internal_refcount, posix_socket, s_socket_destroy_impl);
227     posix_socket->allocator = alloc;
228     posix_socket->connect_args = NULL;
229     posix_socket->close_happened = NULL;
230     socket->impl = posix_socket;
231     return AWS_OP_SUCCESS;
232 }
233 
aws_socket_init(struct aws_socket * socket,struct aws_allocator * alloc,const struct aws_socket_options * options)234 int aws_socket_init(struct aws_socket *socket, struct aws_allocator *alloc, const struct aws_socket_options *options) {
235     AWS_ASSERT(options);
236     return s_socket_init(socket, alloc, options, -1);
237 }
238 
aws_socket_clean_up(struct aws_socket * socket)239 void aws_socket_clean_up(struct aws_socket *socket) {
240     if (!socket->impl) {
241         /* protect from double clean */
242         return;
243     }
244 
245     int fd_for_logging = socket->io_handle.data.fd; /* socket's fd gets reset before final log */
246     (void)fd_for_logging;
247 
248     if (aws_socket_is_open(socket)) {
249         AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: is still open, closing...", (void *)socket, fd_for_logging);
250         aws_socket_close(socket);
251     }
252     struct posix_socket *socket_impl = socket->impl;
253 
254     if (aws_ref_count_release(&socket_impl->internal_refcount) != 0) {
255         AWS_LOGF_DEBUG(
256             AWS_LS_IO_SOCKET,
257             "id=%p fd=%d: is still pending io letting it dangle and cleaning up later.",
258             (void *)socket,
259             fd_for_logging);
260     }
261 
262     AWS_ZERO_STRUCT(*socket);
263     socket->io_handle.data.fd = -1;
264 }
265 
266 static void s_on_connection_error(struct aws_socket *socket, int error);
267 
s_on_connection_success(struct aws_socket * socket)268 static int s_on_connection_success(struct aws_socket *socket) {
269 
270     struct aws_event_loop *event_loop = socket->event_loop;
271     struct posix_socket *socket_impl = socket->impl;
272 
273     if (socket_impl->currently_subscribed) {
274         aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle);
275         socket_impl->currently_subscribed = false;
276     }
277 
278     socket->event_loop = NULL;
279 
280     int connect_result;
281     socklen_t result_length = sizeof(connect_result);
282 
283     if (getsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_ERROR, &connect_result, &result_length) < 0) {
284         AWS_LOGF_ERROR(
285             AWS_LS_IO_SOCKET,
286             "id=%p fd=%d: failed to determine connection error %d",
287             (void *)socket,
288             socket->io_handle.data.fd,
289             errno);
290         int aws_error = s_determine_socket_error(errno);
291         aws_raise_error(aws_error);
292         s_on_connection_error(socket, aws_error);
293         return AWS_OP_ERR;
294     }
295 
296     if (connect_result) {
297         AWS_LOGF_ERROR(
298             AWS_LS_IO_SOCKET,
299             "id=%p fd=%d: connection error %d",
300             (void *)socket,
301             socket->io_handle.data.fd,
302             connect_result);
303         int aws_error = s_determine_socket_error(connect_result);
304         aws_raise_error(aws_error);
305         s_on_connection_error(socket, aws_error);
306         return AWS_OP_ERR;
307     }
308 
309     AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p fd=%d: connection success", (void *)socket, socket->io_handle.data.fd);
310 
311     struct sockaddr_storage address;
312     AWS_ZERO_STRUCT(address);
313     socklen_t address_size = sizeof(address);
314     if (!getsockname(socket->io_handle.data.fd, (struct sockaddr *)&address, &address_size)) {
315         uint16_t port = 0;
316 
317         if (address.ss_family == AF_INET) {
318             struct sockaddr_in *s = (struct sockaddr_in *)&address;
319             port = ntohs(s->sin_port);
320             /* this comes straight from the kernal. a.) they won't fail. b.) even if they do, it's not fatal
321              * once we add logging, we can log this if it fails. */
322             if (inet_ntop(
323                     AF_INET, &s->sin_addr, socket->local_endpoint.address, sizeof(socket->local_endpoint.address))) {
324                 AWS_LOGF_DEBUG(
325                     AWS_LS_IO_SOCKET,
326                     "id=%p fd=%d: local endpoint %s:%d",
327                     (void *)socket,
328                     socket->io_handle.data.fd,
329                     socket->local_endpoint.address,
330                     port);
331             } else {
332                 AWS_LOGF_WARN(
333                     AWS_LS_IO_SOCKET,
334                     "id=%p fd=%d: determining local endpoint failed",
335                     (void *)socket,
336                     socket->io_handle.data.fd);
337             }
338         } else if (address.ss_family == AF_INET6) {
339             struct sockaddr_in6 *s = (struct sockaddr_in6 *)&address;
340             port = ntohs(s->sin6_port);
341             /* this comes straight from the kernal. a.) they won't fail. b.) even if they do, it's not fatal
342              * once we add logging, we can log this if it fails. */
343             if (inet_ntop(
344                     AF_INET6, &s->sin6_addr, socket->local_endpoint.address, sizeof(socket->local_endpoint.address))) {
345                 AWS_LOGF_DEBUG(
346                     AWS_LS_IO_SOCKET,
347                     "id=%p fd %d: local endpoint %s:%d",
348                     (void *)socket,
349                     socket->io_handle.data.fd,
350                     socket->local_endpoint.address,
351                     port);
352             } else {
353                 AWS_LOGF_WARN(
354                     AWS_LS_IO_SOCKET,
355                     "id=%p fd=%d: determining local endpoint failed",
356                     (void *)socket,
357                     socket->io_handle.data.fd);
358             }
359         }
360 
361         socket->local_endpoint.port = port;
362     } else {
363         AWS_LOGF_ERROR(
364             AWS_LS_IO_SOCKET,
365             "id=%p fd=%d: getsockname() failed with error %d",
366             (void *)socket,
367             socket->io_handle.data.fd,
368             errno);
369         int aws_error = s_determine_socket_error(errno);
370         aws_raise_error(aws_error);
371         s_on_connection_error(socket, aws_error);
372         return AWS_OP_ERR;
373     }
374 
375     socket->state = CONNECTED_WRITE | CONNECTED_READ;
376 
377     if (aws_socket_assign_to_event_loop(socket, event_loop)) {
378         AWS_LOGF_ERROR(
379             AWS_LS_IO_SOCKET,
380             "id=%p fd=%d: assignment to event loop %p failed with error %d",
381             (void *)socket,
382             socket->io_handle.data.fd,
383             (void *)event_loop,
384             aws_last_error());
385         s_on_connection_error(socket, aws_last_error());
386         return AWS_OP_ERR;
387     }
388 
389     socket->connection_result_fn(socket, AWS_ERROR_SUCCESS, socket->connect_accept_user_data);
390 
391     return AWS_OP_SUCCESS;
392 }
393 
s_on_connection_error(struct aws_socket * socket,int error)394 static void s_on_connection_error(struct aws_socket *socket, int error) {
395     socket->state = ERROR;
396     AWS_LOGF_ERROR(AWS_LS_IO_SOCKET, "id=%p fd=%d: connection failure", (void *)socket, socket->io_handle.data.fd);
397     if (socket->connection_result_fn) {
398         socket->connection_result_fn(socket, error, socket->connect_accept_user_data);
399     } else if (socket->accept_result_fn) {
400         socket->accept_result_fn(socket, error, NULL, socket->connect_accept_user_data);
401     }
402 }
403 
404 /* the next two callbacks compete based on which one runs first. if s_socket_connect_event
405  * comes back first, then we set socket_args->socket = NULL and continue on with the connection.
406  * if s_handle_socket_timeout() runs first, is sees socket_args->socket is NULL and just cleans up its memory.
407  * s_handle_socket_timeout() will always run so the memory for socket_connect_args is always cleaned up there. */
s_socket_connect_event(struct aws_event_loop * event_loop,struct aws_io_handle * handle,int events,void * user_data)408 static void s_socket_connect_event(
409     struct aws_event_loop *event_loop,
410     struct aws_io_handle *handle,
411     int events,
412     void *user_data) {
413 
414     (void)event_loop;
415     (void)handle;
416 
417     struct posix_socket_connect_args *socket_args = (struct posix_socket_connect_args *)user_data;
418     AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "fd=%d: connection activity handler triggered ", handle->data.fd);
419 
420     if (socket_args->socket) {
421         AWS_LOGF_TRACE(
422             AWS_LS_IO_SOCKET,
423             "id=%p fd=%d: has not timed out yet proceeding with connection.",
424             (void *)socket_args->socket,
425             handle->data.fd);
426 
427         struct posix_socket *socket_impl = socket_args->socket->impl;
428         if (!(events & AWS_IO_EVENT_TYPE_ERROR || events & AWS_IO_EVENT_TYPE_CLOSED) &&
429             (events & AWS_IO_EVENT_TYPE_READABLE || events & AWS_IO_EVENT_TYPE_WRITABLE)) {
430             struct aws_socket *socket = socket_args->socket;
431             socket_args->socket = NULL;
432             socket_impl->connect_args = NULL;
433             s_on_connection_success(socket);
434             return;
435         }
436 
437         int aws_error = aws_socket_get_error(socket_args->socket);
438         /* we'll get another notification. */
439         if (aws_error == AWS_IO_READ_WOULD_BLOCK) {
440             AWS_LOGF_TRACE(
441                 AWS_LS_IO_SOCKET,
442                 "id=%p fd=%d: spurious event, waiting for another notification.",
443                 (void *)socket_args->socket,
444                 handle->data.fd);
445             return;
446         }
447 
448         struct aws_socket *socket = socket_args->socket;
449         socket_args->socket = NULL;
450         socket_impl->connect_args = NULL;
451         aws_raise_error(aws_error);
452         s_on_connection_error(socket, aws_error);
453     }
454 }
455 
s_handle_socket_timeout(struct aws_task * task,void * args,aws_task_status status)456 static void s_handle_socket_timeout(struct aws_task *task, void *args, aws_task_status status) {
457     (void)task;
458     (void)status;
459 
460     struct posix_socket_connect_args *socket_args = args;
461 
462     AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "task_id=%p: timeout task triggered, evaluating timeouts.", (void *)task);
463     /* successful connection will have nulled out connect_args->socket */
464     if (socket_args->socket) {
465         AWS_LOGF_ERROR(
466             AWS_LS_IO_SOCKET,
467             "id=%p fd=%d: timed out, shutting down.",
468             (void *)socket_args->socket,
469             socket_args->socket->io_handle.data.fd);
470 
471         socket_args->socket->state = TIMEDOUT;
472         int error_code = AWS_IO_SOCKET_TIMEOUT;
473 
474         if (status == AWS_TASK_STATUS_RUN_READY) {
475             aws_event_loop_unsubscribe_from_io_events(socket_args->socket->event_loop, &socket_args->socket->io_handle);
476         } else {
477             error_code = AWS_IO_EVENT_LOOP_SHUTDOWN;
478             aws_event_loop_free_io_event_resources(socket_args->socket->event_loop, &socket_args->socket->io_handle);
479         }
480         socket_args->socket->event_loop = NULL;
481         struct posix_socket *socket_impl = socket_args->socket->impl;
482         socket_impl->currently_subscribed = false;
483         aws_raise_error(error_code);
484         struct aws_socket *socket = socket_args->socket;
485         /*socket close sets socket_args->socket to NULL and
486          * socket_impl->connect_args to NULL. */
487         aws_socket_close(socket);
488         s_on_connection_error(socket, error_code);
489     }
490 
491     aws_mem_release(socket_args->allocator, socket_args);
492 }
493 
494 /* this is used simply for moving a connect_success callback when the connect finished immediately
495  * (like for unix domain sockets) into the event loop's thread. Also note, in that case there was no
496  * timeout task scheduled, so in this case the socket_args are cleaned up. */
s_run_connect_success(struct aws_task * task,void * arg,enum aws_task_status status)497 static void s_run_connect_success(struct aws_task *task, void *arg, enum aws_task_status status) {
498     (void)task;
499     struct posix_socket_connect_args *socket_args = arg;
500 
501     if (socket_args->socket) {
502         struct posix_socket *socket_impl = socket_args->socket->impl;
503         if (status == AWS_TASK_STATUS_RUN_READY) {
504             s_on_connection_success(socket_args->socket);
505         } else {
506             aws_raise_error(AWS_IO_SOCKET_CONNECT_ABORTED);
507             socket_args->socket->event_loop = NULL;
508             s_on_connection_error(socket_args->socket, AWS_IO_SOCKET_CONNECT_ABORTED);
509         }
510         socket_impl->connect_args = NULL;
511     }
512 
513     aws_mem_release(socket_args->allocator, socket_args);
514 }
515 
s_convert_pton_error(int pton_code)516 static inline int s_convert_pton_error(int pton_code) {
517     if (pton_code == 0) {
518         return AWS_IO_SOCKET_INVALID_ADDRESS;
519     }
520 
521     return s_determine_socket_error(errno);
522 }
523 
524 struct socket_address {
525     union sock_addr_types {
526         struct sockaddr_in addr_in;
527         struct sockaddr_in6 addr_in6;
528         struct sockaddr_un un_addr;
529 #ifdef USE_VSOCK
530         struct sockaddr_vm vm_addr;
531 #endif
532     } sock_addr_types;
533 };
534 
535 #ifdef USE_VSOCK
536 /** Convert a string to a VSOCK CID. Respects the calling convetion of inet_pton:
537  * 0 on error, 1 on success. */
parse_cid(const char * cid_str,unsigned int * value)538 static int parse_cid(const char *cid_str, unsigned int *value) {
539     if (cid_str == NULL || value == NULL) {
540         errno = EINVAL;
541         return 0;
542     }
543     /* strtoll returns 0 as both error and correct value */
544     errno = 0;
545     /* unsigned long long to handle edge cases in convention explicitly */
546     long long cid = strtoll(cid_str, NULL, 10);
547     if (errno != 0) {
548         return 0;
549     }
550 
551     /* -1U means any, so it's a valid value, but it needs to be converted to
552      * unsigned int. */
553     if (cid == -1) {
554         *value = VMADDR_CID_ANY;
555         return 1;
556     }
557 
558     if (cid < 0 || cid > UINT_MAX) {
559         errno = ERANGE;
560         return 0;
561     }
562 
563     /* cast is safe here, edge cases already checked */
564     *value = (unsigned int)cid;
565     return 1;
566 }
567 #endif
568 
aws_socket_connect(struct aws_socket * socket,const struct aws_socket_endpoint * remote_endpoint,struct aws_event_loop * event_loop,aws_socket_on_connection_result_fn * on_connection_result,void * user_data)569 int aws_socket_connect(
570     struct aws_socket *socket,
571     const struct aws_socket_endpoint *remote_endpoint,
572     struct aws_event_loop *event_loop,
573     aws_socket_on_connection_result_fn *on_connection_result,
574     void *user_data) {
575     AWS_ASSERT(event_loop);
576     AWS_ASSERT(!socket->event_loop);
577 
578     AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: beginning connect.", (void *)socket, socket->io_handle.data.fd);
579 
580     if (socket->event_loop) {
581         return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED);
582     }
583 
584     if (socket->options.type != AWS_SOCKET_DGRAM) {
585         AWS_ASSERT(on_connection_result);
586         if (socket->state != INIT) {
587             return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
588         }
589     } else { /* UDP socket */
590         /* UDP sockets jump to CONNECT_READ if bind is called first */
591         if (socket->state != CONNECTED_READ && socket->state != INIT) {
592             return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
593         }
594     }
595 
596     size_t address_strlen;
597     if (aws_secure_strlen(remote_endpoint->address, AWS_ADDRESS_MAX_LEN, &address_strlen)) {
598         return AWS_OP_ERR;
599     }
600 
601     struct socket_address address;
602     AWS_ZERO_STRUCT(address);
603     socklen_t sock_size = 0;
604     int pton_err = 1;
605     if (socket->options.domain == AWS_SOCKET_IPV4) {
606         pton_err = inet_pton(AF_INET, remote_endpoint->address, &address.sock_addr_types.addr_in.sin_addr);
607         address.sock_addr_types.addr_in.sin_port = htons(remote_endpoint->port);
608         address.sock_addr_types.addr_in.sin_family = AF_INET;
609         sock_size = sizeof(address.sock_addr_types.addr_in);
610     } else if (socket->options.domain == AWS_SOCKET_IPV6) {
611         pton_err = inet_pton(AF_INET6, remote_endpoint->address, &address.sock_addr_types.addr_in6.sin6_addr);
612         address.sock_addr_types.addr_in6.sin6_port = htons(remote_endpoint->port);
613         address.sock_addr_types.addr_in6.sin6_family = AF_INET6;
614         sock_size = sizeof(address.sock_addr_types.addr_in6);
615     } else if (socket->options.domain == AWS_SOCKET_LOCAL) {
616         address.sock_addr_types.un_addr.sun_family = AF_UNIX;
617         strncpy(address.sock_addr_types.un_addr.sun_path, remote_endpoint->address, AWS_ADDRESS_MAX_LEN);
618         sock_size = sizeof(address.sock_addr_types.un_addr);
619 #ifdef USE_VSOCK
620     } else if (socket->options.domain == AWS_SOCKET_VSOCK) {
621         pton_err = parse_cid(remote_endpoint->address, &address.sock_addr_types.vm_addr.svm_cid);
622         address.sock_addr_types.vm_addr.svm_family = AF_VSOCK;
623         address.sock_addr_types.vm_addr.svm_port = (unsigned int)remote_endpoint->port;
624         sock_size = sizeof(address.sock_addr_types.vm_addr);
625 #endif
626     } else {
627         AWS_ASSERT(0);
628         return aws_raise_error(AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY);
629     }
630 
631     if (pton_err != 1) {
632         AWS_LOGF_ERROR(
633             AWS_LS_IO_SOCKET,
634             "id=%p fd=%d: failed to parse address %s:%d.",
635             (void *)socket,
636             socket->io_handle.data.fd,
637             remote_endpoint->address,
638             (int)remote_endpoint->port);
639         return aws_raise_error(s_convert_pton_error(pton_err));
640     }
641 
642     AWS_LOGF_DEBUG(
643         AWS_LS_IO_SOCKET,
644         "id=%p fd=%d: connecting to endpoint %s:%d.",
645         (void *)socket,
646         socket->io_handle.data.fd,
647         remote_endpoint->address,
648         (int)remote_endpoint->port);
649 
650     socket->state = CONNECTING;
651     socket->remote_endpoint = *remote_endpoint;
652     socket->connect_accept_user_data = user_data;
653     socket->connection_result_fn = on_connection_result;
654 
655     struct posix_socket *socket_impl = socket->impl;
656 
657     socket_impl->connect_args = aws_mem_calloc(socket->allocator, 1, sizeof(struct posix_socket_connect_args));
658     if (!socket_impl->connect_args) {
659         return AWS_OP_ERR;
660     }
661 
662     socket_impl->connect_args->socket = socket;
663     socket_impl->connect_args->allocator = socket->allocator;
664 
665     socket_impl->connect_args->task.fn = s_handle_socket_timeout;
666     socket_impl->connect_args->task.arg = socket_impl->connect_args;
667 
668     int error_code = connect(socket->io_handle.data.fd, (struct sockaddr *)&address.sock_addr_types, sock_size);
669     socket->event_loop = event_loop;
670 
671     if (!error_code) {
672         AWS_LOGF_INFO(
673             AWS_LS_IO_SOCKET,
674             "id=%p fd=%d: connected immediately, not scheduling timeout.",
675             (void *)socket,
676             socket->io_handle.data.fd);
677         socket_impl->connect_args->task.fn = s_run_connect_success;
678         /* the subscription for IO will happen once we setup the connection in the task. Since we already
679          * know the connection succeeded, we don't need to register for events yet. */
680         aws_event_loop_schedule_task_now(event_loop, &socket_impl->connect_args->task);
681     }
682 
683     if (error_code) {
684         error_code = errno;
685         if (error_code == EINPROGRESS || error_code == EALREADY) {
686             AWS_LOGF_TRACE(
687                 AWS_LS_IO_SOCKET,
688                 "id=%p fd=%d: connection pending waiting on event-loop notification or timeout.",
689                 (void *)socket,
690                 socket->io_handle.data.fd);
691             /* cache the timeout task; it is possible for the IO subscription to come back virtually immediately
692              * and null out the connect args */
693             struct aws_task *timeout_task = &socket_impl->connect_args->task;
694 
695             socket_impl->currently_subscribed = true;
696             /* This event is for when the connection finishes. (the fd will flip writable). */
697             if (aws_event_loop_subscribe_to_io_events(
698                     event_loop,
699                     &socket->io_handle,
700                     AWS_IO_EVENT_TYPE_WRITABLE,
701                     s_socket_connect_event,
702                     socket_impl->connect_args)) {
703                 AWS_LOGF_ERROR(
704                     AWS_LS_IO_SOCKET,
705                     "id=%p fd=%d: failed to register with event-loop %p.",
706                     (void *)socket,
707                     socket->io_handle.data.fd,
708                     (void *)event_loop);
709                 socket_impl->currently_subscribed = false;
710                 socket->event_loop = NULL;
711                 goto err_clean_up;
712             }
713 
714             /* schedule a task to run at the connect timeout interval, if this task runs before the connect
715              * happens, we consider that a timeout. */
716             uint64_t timeout = 0;
717             aws_event_loop_current_clock_time(event_loop, &timeout);
718             timeout += aws_timestamp_convert(
719                 socket->options.connect_timeout_ms, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL);
720             AWS_LOGF_TRACE(
721                 AWS_LS_IO_SOCKET,
722                 "id=%p fd=%d: scheduling timeout task for %llu.",
723                 (void *)socket,
724                 socket->io_handle.data.fd,
725                 (unsigned long long)timeout);
726             aws_event_loop_schedule_task_future(event_loop, timeout_task, timeout);
727         } else {
728             AWS_LOGF_ERROR(
729                 AWS_LS_IO_SOCKET,
730                 "id=%p fd=%d: connect failed with error code %d.",
731                 (void *)socket,
732                 socket->io_handle.data.fd,
733                 error_code);
734             int aws_error = s_determine_socket_error(error_code);
735             aws_raise_error(aws_error);
736             socket->event_loop = NULL;
737             socket_impl->currently_subscribed = false;
738             goto err_clean_up;
739         }
740     }
741     return AWS_OP_SUCCESS;
742 
743 err_clean_up:
744     aws_mem_release(socket->allocator, socket_impl->connect_args);
745     socket_impl->connect_args = NULL;
746     return AWS_OP_ERR;
747 }
748 
aws_socket_bind(struct aws_socket * socket,const struct aws_socket_endpoint * local_endpoint)749 int aws_socket_bind(struct aws_socket *socket, const struct aws_socket_endpoint *local_endpoint) {
750     if (socket->state != INIT) {
751         AWS_LOGF_ERROR(
752             AWS_LS_IO_SOCKET,
753             "id=%p fd=%d: invalid state for bind operation.",
754             (void *)socket,
755             socket->io_handle.data.fd);
756         return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
757     }
758 
759     size_t address_strlen;
760     if (aws_secure_strlen(local_endpoint->address, AWS_ADDRESS_MAX_LEN, &address_strlen)) {
761         return AWS_OP_ERR;
762     }
763 
764     int error_code = -1;
765 
766     socket->local_endpoint = *local_endpoint;
767     AWS_LOGF_INFO(
768         AWS_LS_IO_SOCKET,
769         "id=%p fd=%d: binding to %s:%d.",
770         (void *)socket,
771         socket->io_handle.data.fd,
772         local_endpoint->address,
773         (int)local_endpoint->port);
774 
775     struct socket_address address;
776     AWS_ZERO_STRUCT(address);
777     socklen_t sock_size = 0;
778     int pton_err = 1;
779     if (socket->options.domain == AWS_SOCKET_IPV4) {
780         pton_err = inet_pton(AF_INET, local_endpoint->address, &address.sock_addr_types.addr_in.sin_addr);
781         address.sock_addr_types.addr_in.sin_port = htons(local_endpoint->port);
782         address.sock_addr_types.addr_in.sin_family = AF_INET;
783         sock_size = sizeof(address.sock_addr_types.addr_in);
784     } else if (socket->options.domain == AWS_SOCKET_IPV6) {
785         pton_err = inet_pton(AF_INET6, local_endpoint->address, &address.sock_addr_types.addr_in6.sin6_addr);
786         address.sock_addr_types.addr_in6.sin6_port = htons(local_endpoint->port);
787         address.sock_addr_types.addr_in6.sin6_family = AF_INET6;
788         sock_size = sizeof(address.sock_addr_types.addr_in6);
789     } else if (socket->options.domain == AWS_SOCKET_LOCAL) {
790         address.sock_addr_types.un_addr.sun_family = AF_UNIX;
791         strncpy(address.sock_addr_types.un_addr.sun_path, local_endpoint->address, AWS_ADDRESS_MAX_LEN);
792         sock_size = sizeof(address.sock_addr_types.un_addr);
793 #ifdef USE_VSOCK
794     } else if (socket->options.domain == AWS_SOCKET_VSOCK) {
795         pton_err = parse_cid(local_endpoint->address, &address.sock_addr_types.vm_addr.svm_cid);
796         address.sock_addr_types.vm_addr.svm_family = AF_VSOCK;
797         address.sock_addr_types.vm_addr.svm_port = (unsigned int)local_endpoint->port;
798         sock_size = sizeof(address.sock_addr_types.vm_addr);
799 #endif
800     } else {
801         AWS_ASSERT(0);
802         return aws_raise_error(AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY);
803     }
804 
805     if (pton_err != 1) {
806         AWS_LOGF_ERROR(
807             AWS_LS_IO_SOCKET,
808             "id=%p fd=%d: failed to parse address %s:%d.",
809             (void *)socket,
810             socket->io_handle.data.fd,
811             local_endpoint->address,
812             (int)local_endpoint->port);
813         return aws_raise_error(s_convert_pton_error(pton_err));
814     }
815 
816     error_code = bind(socket->io_handle.data.fd, (struct sockaddr *)&address.sock_addr_types, sock_size);
817 
818     if (!error_code) {
819         if (socket->options.type == AWS_SOCKET_STREAM) {
820             socket->state = BOUND;
821         } else {
822             /* e.g. UDP is now readable */
823             socket->state = CONNECTED_READ;
824         }
825         AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: successfully bound", (void *)socket, socket->io_handle.data.fd);
826 
827         return AWS_OP_SUCCESS;
828     }
829 
830     socket->state = ERROR;
831     error_code = errno;
832     AWS_LOGF_ERROR(
833         AWS_LS_IO_SOCKET,
834         "id=%p fd=%d: bind failed with error code %d",
835         (void *)socket,
836         socket->io_handle.data.fd,
837         error_code);
838 
839     int aws_error = s_determine_socket_error(error_code);
840     return aws_raise_error(aws_error);
841 }
842 
aws_socket_listen(struct aws_socket * socket,int backlog_size)843 int aws_socket_listen(struct aws_socket *socket, int backlog_size) {
844     if (socket->state != BOUND) {
845         AWS_LOGF_ERROR(
846             AWS_LS_IO_SOCKET,
847             "id=%p fd=%d: invalid state for listen operation. You must call bind first.",
848             (void *)socket,
849             socket->io_handle.data.fd);
850         return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
851     }
852 
853     int error_code = listen(socket->io_handle.data.fd, backlog_size);
854 
855     if (!error_code) {
856         AWS_LOGF_INFO(
857             AWS_LS_IO_SOCKET, "id=%p fd=%d: successfully listening", (void *)socket, socket->io_handle.data.fd);
858         socket->state = LISTENING;
859         return AWS_OP_SUCCESS;
860     }
861 
862     AWS_LOGF_ERROR(
863         AWS_LS_IO_SOCKET,
864         "id=%p fd=%d: listen failed with error code %d",
865         (void *)socket,
866         socket->io_handle.data.fd,
867         error_code);
868     error_code = errno;
869     socket->state = ERROR;
870 
871     return aws_raise_error(s_determine_socket_error(error_code));
872 }
873 
874 /* this is called by the event loop handler that was installed in start_accept(). It runs once the FD goes readable,
875  * accepts as many as it can and then returns control to the event loop. */
s_socket_accept_event(struct aws_event_loop * event_loop,struct aws_io_handle * handle,int events,void * user_data)876 static void s_socket_accept_event(
877     struct aws_event_loop *event_loop,
878     struct aws_io_handle *handle,
879     int events,
880     void *user_data) {
881 
882     (void)event_loop;
883 
884     struct aws_socket *socket = user_data;
885     struct posix_socket *socket_impl = socket->impl;
886 
887     AWS_LOGF_DEBUG(
888         AWS_LS_IO_SOCKET, "id=%p fd=%d: listening event received", (void *)socket, socket->io_handle.data.fd);
889 
890     if (socket_impl->continue_accept && events & AWS_IO_EVENT_TYPE_READABLE) {
891         int in_fd = 0;
892         while (socket_impl->continue_accept && in_fd != -1) {
893             struct sockaddr_storage in_addr;
894             socklen_t in_len = sizeof(struct sockaddr_storage);
895 
896             in_fd = accept(handle->data.fd, (struct sockaddr *)&in_addr, &in_len);
897             if (in_fd == -1) {
898                 int error = errno;
899 
900                 if (error == EAGAIN || error == EWOULDBLOCK) {
901                     break;
902                 }
903 
904                 int aws_error = aws_socket_get_error(socket);
905                 aws_raise_error(aws_error);
906                 s_on_connection_error(socket, aws_error);
907                 break;
908             }
909 
910             AWS_LOGF_DEBUG(
911                 AWS_LS_IO_SOCKET, "id=%p fd=%d: incoming connection", (void *)socket, socket->io_handle.data.fd);
912 
913             struct aws_socket *new_sock = aws_mem_acquire(socket->allocator, sizeof(struct aws_socket));
914 
915             if (!new_sock) {
916                 close(in_fd);
917                 s_on_connection_error(socket, aws_last_error());
918                 continue;
919             }
920 
921             if (s_socket_init(new_sock, socket->allocator, &socket->options, in_fd)) {
922                 aws_mem_release(socket->allocator, new_sock);
923                 s_on_connection_error(socket, aws_last_error());
924                 continue;
925             }
926 
927             new_sock->local_endpoint = socket->local_endpoint;
928             new_sock->state = CONNECTED_READ | CONNECTED_WRITE;
929             uint16_t port = 0;
930 
931             /* get the info on the incoming socket's address */
932             if (in_addr.ss_family == AF_INET) {
933                 struct sockaddr_in *s = (struct sockaddr_in *)&in_addr;
934                 port = ntohs(s->sin_port);
935                 /* this came from the kernel, a.) it won't fail. b.) even if it does
936                  * its not fatal. come back and add logging later. */
937                 if (!inet_ntop(
938                         AF_INET,
939                         &s->sin_addr,
940                         new_sock->remote_endpoint.address,
941                         sizeof(new_sock->remote_endpoint.address))) {
942                     AWS_LOGF_WARN(
943                         AWS_LS_IO_SOCKET,
944                         "id=%p fd=%d:. Failed to determine remote address.",
945                         (void *)socket,
946                         socket->io_handle.data.fd)
947                 }
948                 new_sock->options.domain = AWS_SOCKET_IPV4;
949             } else if (in_addr.ss_family == AF_INET6) {
950                 /* this came from the kernel, a.) it won't fail. b.) even if it does
951                  * its not fatal. come back and add logging later. */
952                 struct sockaddr_in6 *s = (struct sockaddr_in6 *)&in_addr;
953                 port = ntohs(s->sin6_port);
954                 if (!inet_ntop(
955                         AF_INET6,
956                         &s->sin6_addr,
957                         new_sock->remote_endpoint.address,
958                         sizeof(new_sock->remote_endpoint.address))) {
959                     AWS_LOGF_WARN(
960                         AWS_LS_IO_SOCKET,
961                         "id=%p fd=%d:. Failed to determine remote address.",
962                         (void *)socket,
963                         socket->io_handle.data.fd)
964                 }
965                 new_sock->options.domain = AWS_SOCKET_IPV6;
966             } else if (in_addr.ss_family == AF_UNIX) {
967                 new_sock->remote_endpoint = socket->local_endpoint;
968                 new_sock->options.domain = AWS_SOCKET_LOCAL;
969             }
970 
971             new_sock->remote_endpoint.port = port;
972 
973             AWS_LOGF_INFO(
974                 AWS_LS_IO_SOCKET,
975                 "id=%p fd=%d: connected to %s:%d, incoming fd %d",
976                 (void *)socket,
977                 socket->io_handle.data.fd,
978                 new_sock->remote_endpoint.address,
979                 new_sock->remote_endpoint.port,
980                 in_fd);
981 
982             int flags = fcntl(in_fd, F_GETFL, 0);
983 
984             flags |= O_NONBLOCK | O_CLOEXEC;
985             fcntl(in_fd, F_SETFL, flags);
986 
987             bool close_occurred = false;
988             socket_impl->close_happened = &close_occurred;
989             socket->accept_result_fn(socket, AWS_ERROR_SUCCESS, new_sock, socket->connect_accept_user_data);
990 
991             if (close_occurred) {
992                 return;
993             }
994 
995             socket_impl->close_happened = NULL;
996         }
997     }
998 
999     AWS_LOGF_TRACE(
1000         AWS_LS_IO_SOCKET,
1001         "id=%p fd=%d: finished processing incoming connections, "
1002         "waiting on event-loop notification",
1003         (void *)socket,
1004         socket->io_handle.data.fd);
1005 }
1006 
aws_socket_start_accept(struct aws_socket * socket,struct aws_event_loop * accept_loop,aws_socket_on_accept_result_fn * on_accept_result,void * user_data)1007 int aws_socket_start_accept(
1008     struct aws_socket *socket,
1009     struct aws_event_loop *accept_loop,
1010     aws_socket_on_accept_result_fn *on_accept_result,
1011     void *user_data) {
1012     AWS_ASSERT(on_accept_result);
1013     AWS_ASSERT(accept_loop);
1014 
1015     if (socket->event_loop) {
1016         AWS_LOGF_ERROR(
1017             AWS_LS_IO_SOCKET,
1018             "id=%p fd=%d: is already assigned to event-loop %p.",
1019             (void *)socket,
1020             socket->io_handle.data.fd,
1021             (void *)socket->event_loop);
1022         return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED);
1023     }
1024 
1025     if (socket->state != LISTENING) {
1026         AWS_LOGF_ERROR(
1027             AWS_LS_IO_SOCKET,
1028             "id=%p fd=%d: invalid state for start_accept operation. You must call listen first.",
1029             (void *)socket,
1030             socket->io_handle.data.fd);
1031         return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
1032     }
1033 
1034     socket->accept_result_fn = on_accept_result;
1035     socket->connect_accept_user_data = user_data;
1036     socket->event_loop = accept_loop;
1037     struct posix_socket *socket_impl = socket->impl;
1038     socket_impl->continue_accept = true;
1039     socket_impl->currently_subscribed = true;
1040 
1041     if (aws_event_loop_subscribe_to_io_events(
1042             socket->event_loop, &socket->io_handle, AWS_IO_EVENT_TYPE_READABLE, s_socket_accept_event, socket)) {
1043         AWS_LOGF_ERROR(
1044             AWS_LS_IO_SOCKET,
1045             "id=%p fd=%d: failed to subscribe to event-loop %p.",
1046             (void *)socket,
1047             socket->io_handle.data.fd,
1048             (void *)socket->event_loop);
1049         socket_impl->continue_accept = false;
1050         socket_impl->currently_subscribed = false;
1051         socket->event_loop = NULL;
1052 
1053         return AWS_OP_ERR;
1054     }
1055 
1056     return AWS_OP_SUCCESS;
1057 }
1058 
1059 struct stop_accept_args {
1060     struct aws_task task;
1061     struct aws_mutex mutex;
1062     struct aws_condition_variable condition_variable;
1063     struct aws_socket *socket;
1064     int ret_code;
1065     bool invoked;
1066 };
1067 
s_stop_accept_pred(void * arg)1068 static bool s_stop_accept_pred(void *arg) {
1069     struct stop_accept_args *stop_accept_args = arg;
1070     return stop_accept_args->invoked;
1071 }
1072 
s_stop_accept_task(struct aws_task * task,void * arg,enum aws_task_status status)1073 static void s_stop_accept_task(struct aws_task *task, void *arg, enum aws_task_status status) {
1074     (void)task;
1075     (void)status;
1076 
1077     struct stop_accept_args *stop_accept_args = arg;
1078     aws_mutex_lock(&stop_accept_args->mutex);
1079     stop_accept_args->ret_code = AWS_OP_SUCCESS;
1080     if (aws_socket_stop_accept(stop_accept_args->socket)) {
1081         stop_accept_args->ret_code = aws_last_error();
1082     }
1083     stop_accept_args->invoked = true;
1084     aws_condition_variable_notify_one(&stop_accept_args->condition_variable);
1085     aws_mutex_unlock(&stop_accept_args->mutex);
1086 }
1087 
aws_socket_stop_accept(struct aws_socket * socket)1088 int aws_socket_stop_accept(struct aws_socket *socket) {
1089     if (socket->state != LISTENING) {
1090         AWS_LOGF_ERROR(
1091             AWS_LS_IO_SOCKET,
1092             "id=%p fd=%d: is not in a listening state, can't stop_accept.",
1093             (void *)socket,
1094             socket->io_handle.data.fd);
1095         return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
1096     }
1097 
1098     AWS_LOGF_INFO(
1099         AWS_LS_IO_SOCKET, "id=%p fd=%d: stopping accepting new connections", (void *)socket, socket->io_handle.data.fd);
1100 
1101     if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) {
1102         struct stop_accept_args args = {
1103             .mutex = AWS_MUTEX_INIT,
1104             .condition_variable = AWS_CONDITION_VARIABLE_INIT,
1105             .invoked = false,
1106             .socket = socket,
1107             .ret_code = AWS_OP_SUCCESS,
1108             .task = {.fn = s_stop_accept_task},
1109         };
1110         AWS_LOGF_INFO(
1111             AWS_LS_IO_SOCKET,
1112             "id=%p fd=%d: stopping accepting new connections from a different thread than "
1113             "the socket is running from. Blocking until it shuts down.",
1114             (void *)socket,
1115             socket->io_handle.data.fd);
1116         /* Look.... I know what I'm doing.... trust me, I'm an engineer.
1117          * We wait on the completion before 'args' goes out of scope.
1118          * NOLINTNEXTLINE */
1119         args.task.arg = &args;
1120         aws_mutex_lock(&args.mutex);
1121         aws_event_loop_schedule_task_now(socket->event_loop, &args.task);
1122         aws_condition_variable_wait_pred(&args.condition_variable, &args.mutex, s_stop_accept_pred, &args);
1123         aws_mutex_unlock(&args.mutex);
1124         AWS_LOGF_INFO(
1125             AWS_LS_IO_SOCKET,
1126             "id=%p fd=%d: stop accept task finished running.",
1127             (void *)socket,
1128             socket->io_handle.data.fd);
1129 
1130         if (args.ret_code) {
1131             return aws_raise_error(args.ret_code);
1132         }
1133         return AWS_OP_SUCCESS;
1134     }
1135 
1136     int ret_val = AWS_OP_SUCCESS;
1137     struct posix_socket *socket_impl = socket->impl;
1138     if (socket_impl->currently_subscribed) {
1139         ret_val = aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle);
1140         socket_impl->currently_subscribed = false;
1141         socket_impl->continue_accept = false;
1142         socket->event_loop = NULL;
1143     }
1144 
1145     return ret_val;
1146 }
1147 
aws_socket_set_options(struct aws_socket * socket,const struct aws_socket_options * options)1148 int aws_socket_set_options(struct aws_socket *socket, const struct aws_socket_options *options) {
1149     if (socket->options.domain != options->domain || socket->options.type != options->type) {
1150         return aws_raise_error(AWS_IO_SOCKET_INVALID_OPTIONS);
1151     }
1152 
1153     AWS_LOGF_DEBUG(
1154         AWS_LS_IO_SOCKET,
1155         "id=%p fd=%d: setting socket options to: keep-alive %d, keep idle %d, keep-alive interval %d, keep-alive probe "
1156         "count %d.",
1157         (void *)socket,
1158         socket->io_handle.data.fd,
1159         (int)options->keepalive,
1160         (int)options->keep_alive_timeout_sec,
1161         (int)options->keep_alive_interval_sec,
1162         (int)options->keep_alive_max_failed_probes);
1163 
1164     socket->options = *options;
1165 
1166 #ifdef NO_SIGNAL_SOCK_OPT
1167     int option_value = 1;
1168     if (AWS_UNLIKELY(setsockopt(
1169             socket->io_handle.data.fd, SOL_SOCKET, NO_SIGNAL_SOCK_OPT, &option_value, sizeof(option_value)))) {
1170         AWS_LOGF_WARN(
1171             AWS_LS_IO_SOCKET,
1172             "id=%p fd=%d: setsockopt() for NO_SIGNAL_SOCK_OPT failed with errno %d.",
1173             (void *)socket,
1174             socket->io_handle.data.fd,
1175             errno);
1176     }
1177 #endif /* NO_SIGNAL_SOCK_OPT */
1178 
1179     int reuse = 1;
1180     if (AWS_UNLIKELY(setsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(int)))) {
1181         AWS_LOGF_WARN(
1182             AWS_LS_IO_SOCKET,
1183             "id=%p fd=%d: setsockopt() for SO_REUSEADDR failed with errno %d.",
1184             (void *)socket,
1185             socket->io_handle.data.fd,
1186             errno);
1187     }
1188 
1189     if (options->type == AWS_SOCKET_STREAM && options->domain != AWS_SOCKET_LOCAL) {
1190         if (socket->options.keepalive) {
1191             int keep_alive = 1;
1192             if (AWS_UNLIKELY(
1193                     setsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_KEEPALIVE, &keep_alive, sizeof(int)))) {
1194                 AWS_LOGF_WARN(
1195                     AWS_LS_IO_SOCKET,
1196                     "id=%p fd=%d: setsockopt() for enabling SO_KEEPALIVE failed with errno %d.",
1197                     (void *)socket,
1198                     socket->io_handle.data.fd,
1199                     errno);
1200             }
1201         }
1202 
1203         if (socket->options.keep_alive_interval_sec && socket->options.keep_alive_timeout_sec) {
1204             int ival_in_secs = socket->options.keep_alive_interval_sec;
1205             if (AWS_UNLIKELY(setsockopt(
1206                     socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPIDLE, &ival_in_secs, sizeof(ival_in_secs)))) {
1207                 AWS_LOGF_WARN(
1208                     AWS_LS_IO_SOCKET,
1209                     "id=%p fd=%d: setsockopt() for enabling TCP_KEEPIDLE for TCP failed with errno %d.",
1210                     (void *)socket,
1211                     socket->io_handle.data.fd,
1212                     errno);
1213             }
1214 
1215             ival_in_secs = socket->options.keep_alive_timeout_sec;
1216             if (AWS_UNLIKELY(setsockopt(
1217                     socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPINTVL, &ival_in_secs, sizeof(ival_in_secs)))) {
1218                 AWS_LOGF_WARN(
1219                     AWS_LS_IO_SOCKET,
1220                     "id=%p fd=%d: setsockopt() for enabling TCP_KEEPINTVL for TCP failed with errno %d.",
1221                     (void *)socket,
1222                     socket->io_handle.data.fd,
1223                     errno);
1224             }
1225         }
1226 
1227         if (socket->options.keep_alive_max_failed_probes) {
1228             int max_probes = socket->options.keep_alive_max_failed_probes;
1229             if (AWS_UNLIKELY(
1230                     setsockopt(socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPCNT, &max_probes, sizeof(max_probes)))) {
1231                 AWS_LOGF_WARN(
1232                     AWS_LS_IO_SOCKET,
1233                     "id=%p fd=%d: setsockopt() for enabling TCP_KEEPCNT for TCP failed with errno %d.",
1234                     (void *)socket,
1235                     socket->io_handle.data.fd,
1236                     errno);
1237             }
1238         }
1239     }
1240 
1241     return AWS_OP_SUCCESS;
1242 }
1243 
1244 struct write_request {
1245     struct aws_byte_cursor cursor_cpy;
1246     aws_socket_on_write_completed_fn *written_fn;
1247     void *write_user_data;
1248     struct aws_linked_list_node node;
1249     size_t original_buffer_len;
1250     int error_code;
1251 };
1252 
1253 struct posix_socket_close_args {
1254     struct aws_mutex mutex;
1255     struct aws_condition_variable condition_variable;
1256     struct aws_socket *socket;
1257     bool invoked;
1258     int ret_code;
1259 };
1260 
s_close_predicate(void * arg)1261 static bool s_close_predicate(void *arg) {
1262     struct posix_socket_close_args *close_args = arg;
1263     return close_args->invoked;
1264 }
1265 
s_close_task(struct aws_task * task,void * arg,enum aws_task_status status)1266 static void s_close_task(struct aws_task *task, void *arg, enum aws_task_status status) {
1267     (void)task;
1268     (void)status;
1269 
1270     struct posix_socket_close_args *close_args = arg;
1271     aws_mutex_lock(&close_args->mutex);
1272     close_args->ret_code = AWS_OP_SUCCESS;
1273 
1274     if (aws_socket_close(close_args->socket)) {
1275         close_args->ret_code = aws_last_error();
1276     }
1277 
1278     close_args->invoked = true;
1279     aws_condition_variable_notify_one(&close_args->condition_variable);
1280     aws_mutex_unlock(&close_args->mutex);
1281 }
1282 
aws_socket_close(struct aws_socket * socket)1283 int aws_socket_close(struct aws_socket *socket) {
1284     struct posix_socket *socket_impl = socket->impl;
1285     AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: closing", (void *)socket, socket->io_handle.data.fd);
1286     struct aws_event_loop *event_loop = socket->event_loop;
1287     if (socket->event_loop) {
1288         /* don't freak out on me, this almost never happens, and never occurs inside a channel
1289          * it only gets hit from a listening socket shutting down or from a unit test. */
1290         if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) {
1291             AWS_LOGF_INFO(
1292                 AWS_LS_IO_SOCKET,
1293                 "id=%p fd=%d: closing from a different thread than "
1294                 "the socket is running from. Blocking until it closes down.",
1295                 (void *)socket,
1296                 socket->io_handle.data.fd);
1297             /* the only time we allow this kind of thing is when you're a listener.*/
1298             if (socket->state != LISTENING) {
1299                 return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
1300             }
1301 
1302             struct posix_socket_close_args args = {
1303                 .mutex = AWS_MUTEX_INIT,
1304                 .condition_variable = AWS_CONDITION_VARIABLE_INIT,
1305                 .socket = socket,
1306                 .ret_code = AWS_OP_SUCCESS,
1307                 .invoked = false,
1308             };
1309 
1310             struct aws_task close_task = {
1311                 .fn = s_close_task,
1312                 .arg = &args,
1313             };
1314 
1315             int fd_for_logging = socket->io_handle.data.fd; /* socket's fd gets reset before final log */
1316             (void)fd_for_logging;
1317 
1318             aws_mutex_lock(&args.mutex);
1319             aws_event_loop_schedule_task_now(socket->event_loop, &close_task);
1320             aws_condition_variable_wait_pred(&args.condition_variable, &args.mutex, s_close_predicate, &args);
1321             aws_mutex_unlock(&args.mutex);
1322             AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p fd=%d: close task completed.", (void *)socket, fd_for_logging);
1323             if (args.ret_code) {
1324                 return aws_raise_error(args.ret_code);
1325             }
1326 
1327             return AWS_OP_SUCCESS;
1328         }
1329 
1330         if (socket_impl->currently_subscribed) {
1331             if (socket->state & LISTENING) {
1332                 aws_socket_stop_accept(socket);
1333             } else {
1334                 int err_code = aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle);
1335 
1336                 if (err_code) {
1337                     return AWS_OP_ERR;
1338                 }
1339             }
1340             socket_impl->currently_subscribed = false;
1341             socket->event_loop = NULL;
1342         }
1343     }
1344 
1345     if (socket_impl->close_happened) {
1346         *socket_impl->close_happened = true;
1347     }
1348 
1349     if (socket_impl->connect_args) {
1350         socket_impl->connect_args->socket = NULL;
1351         socket_impl->connect_args = NULL;
1352     }
1353 
1354     if (aws_socket_is_open(socket)) {
1355         close(socket->io_handle.data.fd);
1356         socket->io_handle.data.fd = -1;
1357         socket->state = CLOSED;
1358 
1359         /* ensure callbacks for pending writes fire (in order) before this close function returns */
1360 
1361         if (socket_impl->written_task_scheduled) {
1362             aws_event_loop_cancel_task(event_loop, &socket_impl->written_task);
1363         }
1364 
1365         while (!aws_linked_list_empty(&socket_impl->written_queue)) {
1366             struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->written_queue);
1367             struct write_request *write_request = AWS_CONTAINER_OF(node, struct write_request, node);
1368             size_t bytes_written = write_request->original_buffer_len - write_request->cursor_cpy.len;
1369             write_request->written_fn(socket, write_request->error_code, bytes_written, write_request->write_user_data);
1370             aws_mem_release(socket->allocator, write_request);
1371         }
1372 
1373         while (!aws_linked_list_empty(&socket_impl->write_queue)) {
1374             struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->write_queue);
1375             struct write_request *write_request = AWS_CONTAINER_OF(node, struct write_request, node);
1376             size_t bytes_written = write_request->original_buffer_len - write_request->cursor_cpy.len;
1377             write_request->written_fn(socket, AWS_IO_SOCKET_CLOSED, bytes_written, write_request->write_user_data);
1378             aws_mem_release(socket->allocator, write_request);
1379         }
1380     }
1381 
1382     return AWS_OP_SUCCESS;
1383 }
1384 
aws_socket_shutdown_dir(struct aws_socket * socket,enum aws_channel_direction dir)1385 int aws_socket_shutdown_dir(struct aws_socket *socket, enum aws_channel_direction dir) {
1386     int how = dir == AWS_CHANNEL_DIR_READ ? 0 : 1;
1387     AWS_LOGF_DEBUG(
1388         AWS_LS_IO_SOCKET, "id=%p fd=%d: shutting down in direction %d", (void *)socket, socket->io_handle.data.fd, dir);
1389     if (shutdown(socket->io_handle.data.fd, how)) {
1390         int aws_error = s_determine_socket_error(errno);
1391         return aws_raise_error(aws_error);
1392     }
1393 
1394     if (dir == AWS_CHANNEL_DIR_READ) {
1395         socket->state &= ~CONNECTED_READ;
1396     } else {
1397         socket->state &= ~CONNECTED_WRITE;
1398     }
1399 
1400     return AWS_OP_SUCCESS;
1401 }
1402 
s_written_task(struct aws_task * task,void * arg,enum aws_task_status status)1403 static void s_written_task(struct aws_task *task, void *arg, enum aws_task_status status) {
1404     (void)task;
1405     (void)status;
1406 
1407     struct aws_socket *socket = arg;
1408     struct posix_socket *socket_impl = socket->impl;
1409 
1410     socket_impl->written_task_scheduled = false;
1411 
1412     /* this is to handle a race condition when a callback kicks off a cleanup, or the user decides
1413      * to close the socket based on something they read (SSL validation failed for example).
1414      * if clean_up happens when internal_refcount > 0, socket_impl is kept dangling */
1415     aws_ref_count_acquire(&socket_impl->internal_refcount);
1416 
1417     /* Notes about weird loop:
1418      * 1) Only process the initial contents of queue when this task is run,
1419      *    ignoring any writes queued during delivery.
1420      *    If we simply looped until the queue was empty, we could get into a
1421      *    synchronous loop of completing and writing and completing and writing...
1422      *    and it would be tough for multiple sockets to share an event-loop fairly.
1423      * 2) Check if queue is empty with each iteration.
1424      *    If user calls close() from the callback, close() will process all
1425      *    nodes in the written_queue, and the queue will be empty when the
1426      *    callstack gets back to here. */
1427     if (!aws_linked_list_empty(&socket_impl->written_queue)) {
1428         struct aws_linked_list_node *stop_after = aws_linked_list_back(&socket_impl->written_queue);
1429         do {
1430             struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->written_queue);
1431             struct write_request *write_request = AWS_CONTAINER_OF(node, struct write_request, node);
1432             size_t bytes_written = write_request->original_buffer_len - write_request->cursor_cpy.len;
1433             write_request->written_fn(socket, write_request->error_code, bytes_written, write_request->write_user_data);
1434             aws_mem_release(socket_impl->allocator, write_request);
1435             if (node == stop_after) {
1436                 break;
1437             }
1438         } while (!aws_linked_list_empty(&socket_impl->written_queue));
1439     }
1440 
1441     aws_ref_count_release(&socket_impl->internal_refcount);
1442 }
1443 
1444 /* this gets called in two scenarios.
1445  * 1st scenario, someone called aws_socket_write() and we want to try writing now, so an error can be returned
1446  * immediately if something bad has happened to the socket. In this case, `parent_request` is set.
1447  * 2nd scenario, the event loop notified us that the socket went writable. In this case `parent_request` is NULL */
s_process_write_requests(struct aws_socket * socket,struct write_request * parent_request)1448 static int s_process_write_requests(struct aws_socket *socket, struct write_request *parent_request) {
1449     struct posix_socket *socket_impl = socket->impl;
1450 
1451     if (parent_request) {
1452         AWS_LOGF_TRACE(
1453             AWS_LS_IO_SOCKET,
1454             "id=%p fd=%d: processing write requests, called from aws_socket_write",
1455             (void *)socket,
1456             socket->io_handle.data.fd);
1457     } else {
1458         AWS_LOGF_TRACE(
1459             AWS_LS_IO_SOCKET,
1460             "id=%p fd=%d: processing write requests, invoked by the event-loop",
1461             (void *)socket,
1462             socket->io_handle.data.fd);
1463     }
1464 
1465     bool purge = false;
1466     int aws_error = AWS_OP_SUCCESS;
1467     bool parent_request_failed = false;
1468     bool pushed_to_written_queue = false;
1469 
1470     /* if a close call happens in the middle, this queue will have been cleaned out from under us. */
1471     while (!aws_linked_list_empty(&socket_impl->write_queue)) {
1472         struct aws_linked_list_node *node = aws_linked_list_front(&socket_impl->write_queue);
1473         struct write_request *write_request = AWS_CONTAINER_OF(node, struct write_request, node);
1474 
1475         AWS_LOGF_TRACE(
1476             AWS_LS_IO_SOCKET,
1477             "id=%p fd=%d: dequeued write request of size %llu, remaining to write %llu",
1478             (void *)socket,
1479             socket->io_handle.data.fd,
1480             (unsigned long long)write_request->original_buffer_len,
1481             (unsigned long long)write_request->cursor_cpy.len);
1482 
1483         ssize_t written = send(
1484             socket->io_handle.data.fd, write_request->cursor_cpy.ptr, write_request->cursor_cpy.len, NO_SIGNAL_SEND);
1485 
1486         AWS_LOGF_TRACE(
1487             AWS_LS_IO_SOCKET,
1488             "id=%p fd=%d: send written size %d",
1489             (void *)socket,
1490             socket->io_handle.data.fd,
1491             (int)written);
1492 
1493         if (written < 0) {
1494             int error = errno;
1495             if (error == EAGAIN) {
1496                 AWS_LOGF_TRACE(
1497                     AWS_LS_IO_SOCKET, "id=%p fd=%d: returned would block", (void *)socket, socket->io_handle.data.fd);
1498                 break;
1499             }
1500 
1501             if (error == EPIPE) {
1502                 AWS_LOGF_DEBUG(
1503                     AWS_LS_IO_SOCKET,
1504                     "id=%p fd=%d: already closed before write",
1505                     (void *)socket,
1506                     socket->io_handle.data.fd);
1507                 aws_error = AWS_IO_SOCKET_CLOSED;
1508                 aws_raise_error(aws_error);
1509                 purge = true;
1510                 break;
1511             }
1512 
1513             purge = true;
1514             AWS_LOGF_DEBUG(
1515                 AWS_LS_IO_SOCKET,
1516                 "id=%p fd=%d: write error with error code %d",
1517                 (void *)socket,
1518                 socket->io_handle.data.fd,
1519                 error);
1520             aws_error = s_determine_socket_error(error);
1521             aws_raise_error(aws_error);
1522             break;
1523         }
1524 
1525         size_t remaining_to_write = write_request->cursor_cpy.len;
1526 
1527         aws_byte_cursor_advance(&write_request->cursor_cpy, (size_t)written);
1528         AWS_LOGF_TRACE(
1529             AWS_LS_IO_SOCKET,
1530             "id=%p fd=%d: remaining write request to write %llu",
1531             (void *)socket,
1532             socket->io_handle.data.fd,
1533             (unsigned long long)write_request->cursor_cpy.len);
1534 
1535         if ((size_t)written == remaining_to_write) {
1536             AWS_LOGF_TRACE(
1537                 AWS_LS_IO_SOCKET, "id=%p fd=%d: write request completed", (void *)socket, socket->io_handle.data.fd);
1538 
1539             aws_linked_list_remove(node);
1540             write_request->error_code = AWS_ERROR_SUCCESS;
1541             aws_linked_list_push_back(&socket_impl->written_queue, node);
1542             pushed_to_written_queue = true;
1543         }
1544     }
1545 
1546     if (purge) {
1547         while (!aws_linked_list_empty(&socket_impl->write_queue)) {
1548             struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->write_queue);
1549             struct write_request *write_request = AWS_CONTAINER_OF(node, struct write_request, node);
1550 
1551             /* If this fn was invoked directly from aws_socket_write(), don't invoke the error callback
1552              * as the user will be able to rely on the return value from aws_socket_write() */
1553             if (write_request == parent_request) {
1554                 parent_request_failed = true;
1555                 aws_mem_release(socket->allocator, write_request);
1556             } else {
1557                 write_request->error_code = aws_error;
1558                 aws_linked_list_push_back(&socket_impl->written_queue, node);
1559                 pushed_to_written_queue = true;
1560             }
1561         }
1562     }
1563 
1564     if (pushed_to_written_queue && !socket_impl->written_task_scheduled) {
1565         socket_impl->written_task_scheduled = true;
1566         aws_task_init(&socket_impl->written_task, s_written_task, socket, "socket_written_task");
1567         aws_event_loop_schedule_task_now(socket->event_loop, &socket_impl->written_task);
1568     }
1569 
1570     /* Only report error if aws_socket_write() invoked this function and its write_request failed */
1571     if (!parent_request_failed) {
1572         return AWS_OP_SUCCESS;
1573     }
1574 
1575     aws_raise_error(aws_error);
1576     return AWS_OP_ERR;
1577 }
1578 
s_on_socket_io_event(struct aws_event_loop * event_loop,struct aws_io_handle * handle,int events,void * user_data)1579 static void s_on_socket_io_event(
1580     struct aws_event_loop *event_loop,
1581     struct aws_io_handle *handle,
1582     int events,
1583     void *user_data) {
1584     (void)event_loop;
1585     (void)handle;
1586     struct aws_socket *socket = user_data;
1587     struct posix_socket *socket_impl = socket->impl;
1588 
1589     /* this is to handle a race condition when an error kicks off a cleanup, or the user decides
1590      * to close the socket based on something they read (SSL validation failed for example).
1591      * if clean_up happens when internal_refcount > 0, socket_impl is kept dangling but currently
1592      * subscribed is set to false. */
1593     aws_ref_count_acquire(&socket_impl->internal_refcount);
1594 
1595     if (events & AWS_IO_EVENT_TYPE_REMOTE_HANG_UP || events & AWS_IO_EVENT_TYPE_CLOSED) {
1596         aws_raise_error(AWS_IO_SOCKET_CLOSED);
1597         AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: closed remotely", (void *)socket, socket->io_handle.data.fd);
1598         if (socket->readable_fn) {
1599             socket->readable_fn(socket, AWS_IO_SOCKET_CLOSED, socket->readable_user_data);
1600         }
1601         goto end_check;
1602     }
1603 
1604     if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_ERROR) {
1605         int aws_error = aws_socket_get_error(socket);
1606         aws_raise_error(aws_error);
1607         AWS_LOGF_TRACE(
1608             AWS_LS_IO_SOCKET, "id=%p fd=%d: error event occurred", (void *)socket, socket->io_handle.data.fd);
1609         if (socket->readable_fn) {
1610             socket->readable_fn(socket, aws_error, socket->readable_user_data);
1611         }
1612         goto end_check;
1613     }
1614 
1615     if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_READABLE) {
1616         AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is readable", (void *)socket, socket->io_handle.data.fd);
1617         if (socket->readable_fn) {
1618             socket->readable_fn(socket, AWS_OP_SUCCESS, socket->readable_user_data);
1619         }
1620     }
1621     /* if socket closed in between these branches, the currently_subscribed will be false and socket_impl will not
1622      * have been cleaned up, so this next branch is safe. */
1623     if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_WRITABLE) {
1624         AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is writable", (void *)socket, socket->io_handle.data.fd);
1625         s_process_write_requests(socket, NULL);
1626     }
1627 
1628 end_check:
1629     aws_ref_count_release(&socket_impl->internal_refcount);
1630 }
1631 
aws_socket_assign_to_event_loop(struct aws_socket * socket,struct aws_event_loop * event_loop)1632 int aws_socket_assign_to_event_loop(struct aws_socket *socket, struct aws_event_loop *event_loop) {
1633     if (!socket->event_loop) {
1634         AWS_LOGF_DEBUG(
1635             AWS_LS_IO_SOCKET,
1636             "id=%p fd=%d: assigning to event loop %p",
1637             (void *)socket,
1638             socket->io_handle.data.fd,
1639             (void *)event_loop);
1640         socket->event_loop = event_loop;
1641         struct posix_socket *socket_impl = socket->impl;
1642         socket_impl->currently_subscribed = true;
1643         if (aws_event_loop_subscribe_to_io_events(
1644                 event_loop,
1645                 &socket->io_handle,
1646                 AWS_IO_EVENT_TYPE_WRITABLE | AWS_IO_EVENT_TYPE_READABLE,
1647                 s_on_socket_io_event,
1648                 socket)) {
1649             AWS_LOGF_ERROR(
1650                 AWS_LS_IO_SOCKET,
1651                 "id=%p fd=%d: assigning to event loop %p failed with error %d",
1652                 (void *)socket,
1653                 socket->io_handle.data.fd,
1654                 (void *)event_loop,
1655                 aws_last_error());
1656             socket_impl->currently_subscribed = false;
1657             socket->event_loop = NULL;
1658             return AWS_OP_ERR;
1659         }
1660 
1661         return AWS_OP_SUCCESS;
1662     }
1663 
1664     return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED);
1665 }
1666 
aws_socket_get_event_loop(struct aws_socket * socket)1667 struct aws_event_loop *aws_socket_get_event_loop(struct aws_socket *socket) {
1668     return socket->event_loop;
1669 }
1670 
aws_socket_subscribe_to_readable_events(struct aws_socket * socket,aws_socket_on_readable_fn * on_readable,void * user_data)1671 int aws_socket_subscribe_to_readable_events(
1672     struct aws_socket *socket,
1673     aws_socket_on_readable_fn *on_readable,
1674     void *user_data) {
1675 
1676     AWS_LOGF_TRACE(
1677         AWS_LS_IO_SOCKET, " id=%p fd=%d: subscribing to readable events", (void *)socket, socket->io_handle.data.fd);
1678     if (!(socket->state & CONNECTED_READ)) {
1679         AWS_LOGF_ERROR(
1680             AWS_LS_IO_SOCKET,
1681             "id=%p fd=%d: can't subscribe to readable events since the socket is not connected",
1682             (void *)socket,
1683             socket->io_handle.data.fd);
1684         return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED);
1685     }
1686 
1687     if (socket->readable_fn) {
1688         AWS_LOGF_ERROR(
1689             AWS_LS_IO_SOCKET,
1690             "id=%p fd=%d: can't subscribe to readable events since it is already subscribed",
1691             (void *)socket,
1692             socket->io_handle.data.fd);
1693         return aws_raise_error(AWS_ERROR_IO_ALREADY_SUBSCRIBED);
1694     }
1695 
1696     AWS_ASSERT(on_readable);
1697     socket->readable_user_data = user_data;
1698     socket->readable_fn = on_readable;
1699 
1700     return AWS_OP_SUCCESS;
1701 }
1702 
aws_socket_read(struct aws_socket * socket,struct aws_byte_buf * buffer,size_t * amount_read)1703 int aws_socket_read(struct aws_socket *socket, struct aws_byte_buf *buffer, size_t *amount_read) {
1704     AWS_ASSERT(amount_read);
1705 
1706     if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) {
1707         AWS_LOGF_ERROR(
1708             AWS_LS_IO_SOCKET,
1709             "id=%p fd=%d: cannot read from a different thread than event loop %p",
1710             (void *)socket,
1711             socket->io_handle.data.fd,
1712             (void *)socket->event_loop);
1713         return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
1714     }
1715 
1716     if (!(socket->state & CONNECTED_READ)) {
1717         AWS_LOGF_ERROR(
1718             AWS_LS_IO_SOCKET,
1719             "id=%p fd=%d: cannot read because it is not connected",
1720             (void *)socket,
1721             socket->io_handle.data.fd);
1722         return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED);
1723     }
1724 
1725     ssize_t read_val = read(socket->io_handle.data.fd, buffer->buffer + buffer->len, buffer->capacity - buffer->len);
1726     int error = errno;
1727 
1728     AWS_LOGF_TRACE(
1729         AWS_LS_IO_SOCKET, "id=%p fd=%d: read of %d", (void *)socket, socket->io_handle.data.fd, (int)read_val);
1730 
1731     if (read_val > 0) {
1732         *amount_read = (size_t)read_val;
1733         buffer->len += *amount_read;
1734         return AWS_OP_SUCCESS;
1735     }
1736 
1737     /* read_val of 0 means EOF which we'll treat as AWS_IO_SOCKET_CLOSED */
1738     if (read_val == 0) {
1739         AWS_LOGF_INFO(
1740             AWS_LS_IO_SOCKET, "id=%p fd=%d: zero read, socket is closed", (void *)socket, socket->io_handle.data.fd);
1741         *amount_read = 0;
1742 
1743         if (buffer->capacity - buffer->len > 0) {
1744             return aws_raise_error(AWS_IO_SOCKET_CLOSED);
1745         }
1746 
1747         return AWS_OP_SUCCESS;
1748     }
1749 
1750 #if defined(EWOULDBLOCK)
1751     if (error == EAGAIN || error == EWOULDBLOCK) {
1752 #else
1753     if (error == EAGAIN) {
1754 #endif
1755         AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: read would block", (void *)socket, socket->io_handle.data.fd);
1756         return aws_raise_error(AWS_IO_READ_WOULD_BLOCK);
1757     }
1758 
1759     if (error == EPIPE) {
1760         AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p fd=%d: socket is closed.", (void *)socket, socket->io_handle.data.fd);
1761         return aws_raise_error(AWS_IO_SOCKET_CLOSED);
1762     }
1763 
1764     if (error == ETIMEDOUT) {
1765         AWS_LOGF_ERROR(AWS_LS_IO_SOCKET, "id=%p fd=%d: socket timed out.", (void *)socket, socket->io_handle.data.fd);
1766         return aws_raise_error(AWS_IO_SOCKET_TIMEOUT);
1767     }
1768 
1769     AWS_LOGF_ERROR(
1770         AWS_LS_IO_SOCKET,
1771         "id=%p fd=%d: read failed with error: %s",
1772         (void *)socket,
1773         socket->io_handle.data.fd,
1774         strerror(error));
1775     return aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
1776 }
1777 
1778 int aws_socket_write(
1779     struct aws_socket *socket,
1780     const struct aws_byte_cursor *cursor,
1781     aws_socket_on_write_completed_fn *written_fn,
1782     void *user_data) {
1783     if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) {
1784         return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
1785     }
1786 
1787     if (!(socket->state & CONNECTED_WRITE)) {
1788         AWS_LOGF_ERROR(
1789             AWS_LS_IO_SOCKET,
1790             "id=%p fd=%d: cannot write to because it is not connected",
1791             (void *)socket,
1792             socket->io_handle.data.fd);
1793         return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED);
1794     }
1795 
1796     AWS_ASSERT(written_fn);
1797     struct posix_socket *socket_impl = socket->impl;
1798     struct write_request *write_request = aws_mem_calloc(socket->allocator, 1, sizeof(struct write_request));
1799 
1800     if (!write_request) {
1801         return AWS_OP_ERR;
1802     }
1803 
1804     write_request->original_buffer_len = cursor->len;
1805     write_request->written_fn = written_fn;
1806     write_request->write_user_data = user_data;
1807     write_request->cursor_cpy = *cursor;
1808     aws_linked_list_push_back(&socket_impl->write_queue, &write_request->node);
1809 
1810     return s_process_write_requests(socket, write_request);
1811 }
1812 
1813 int aws_socket_get_error(struct aws_socket *socket) {
1814     int connect_result;
1815     socklen_t result_length = sizeof(connect_result);
1816 
1817     if (getsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_ERROR, &connect_result, &result_length) < 0) {
1818         return AWS_OP_ERR;
1819     }
1820 
1821     if (connect_result) {
1822         return s_determine_socket_error(connect_result);
1823     }
1824 
1825     return AWS_OP_SUCCESS;
1826 }
1827 
1828 bool aws_socket_is_open(struct aws_socket *socket) {
1829     return socket->io_handle.data.fd >= 0;
1830 }
1831