1 /**
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0.
4 */
5
6 #include <aws/io/socket.h>
7
8 #include <aws/common/clock.h>
9 #include <aws/common/condition_variable.h>
10 #include <aws/common/mutex.h>
11 #include <aws/common/string.h>
12
13 #include <aws/io/event_loop.h>
14 #include <aws/io/logging.h>
15
16 #include <arpa/inet.h>
17 #include <aws/io/io.h>
18 #include <errno.h>
19 #include <fcntl.h>
20 #include <netinet/tcp.h>
21 #include <sys/socket.h>
22 #include <sys/types.h>
23 #include <unistd.h>
24
25 /*
26 * On OsX, suppress NoPipe signals via flags to setsockopt()
27 * On Linux, suppress NoPipe signals via flags to send()
28 */
29 #if defined(__MACH__)
30 # define NO_SIGNAL_SOCK_OPT SO_NOSIGPIPE
31 # define NO_SIGNAL_SEND 0
32 # define TCP_KEEPIDLE TCP_KEEPALIVE
33 #else
34 # define NO_SIGNAL_SEND MSG_NOSIGNAL
35 #endif
36
37 /* This isn't defined on ancient linux distros (breaking the builds).
38 * However, if this is a prebuild, we purposely build on an ancient system, but
39 * we want the kernel calls to still be the same as a modern build since that's likely the target of the application
40 * calling this code. Just define this if it isn't there already. GlibC and the kernel don't really care how the flag
41 * gets passed as long as it does.
42 */
43 #ifndef O_CLOEXEC
44 # define O_CLOEXEC 02000000
45 #endif
46
47 #ifdef USE_VSOCK
48 # if defined(__linux__) && defined(AF_VSOCK)
49 # include <linux/vm_sockets.h>
50 # else
51 # error "USE_VSOCK not supported on current platform"
52 # endif
53 #endif
54
55 /* other than CONNECTED_READ | CONNECTED_WRITE
56 * a socket is only in one of these states at a time. */
57 enum socket_state {
58 INIT = 0x01,
59 CONNECTING = 0x02,
60 CONNECTED_READ = 0x04,
61 CONNECTED_WRITE = 0x08,
62 BOUND = 0x10,
63 LISTENING = 0x20,
64 TIMEDOUT = 0x40,
65 ERROR = 0x80,
66 CLOSED,
67 };
68
s_convert_domain(enum aws_socket_domain domain)69 static int s_convert_domain(enum aws_socket_domain domain) {
70 switch (domain) {
71 case AWS_SOCKET_IPV4:
72 return AF_INET;
73 case AWS_SOCKET_IPV6:
74 return AF_INET6;
75 case AWS_SOCKET_LOCAL:
76 return AF_UNIX;
77 #ifdef USE_VSOCK
78 case AWS_SOCKET_VSOCK:
79 return AF_VSOCK;
80 #endif
81 default:
82 AWS_ASSERT(0);
83 return AF_INET;
84 }
85 }
86
s_convert_type(enum aws_socket_type type)87 static int s_convert_type(enum aws_socket_type type) {
88 switch (type) {
89 case AWS_SOCKET_STREAM:
90 return SOCK_STREAM;
91 case AWS_SOCKET_DGRAM:
92 return SOCK_DGRAM;
93 default:
94 AWS_ASSERT(0);
95 return SOCK_STREAM;
96 }
97 }
98
s_determine_socket_error(int error)99 static int s_determine_socket_error(int error) {
100 switch (error) {
101 case ECONNREFUSED:
102 return AWS_IO_SOCKET_CONNECTION_REFUSED;
103 case ETIMEDOUT:
104 return AWS_IO_SOCKET_TIMEOUT;
105 case EHOSTUNREACH:
106 case ENETUNREACH:
107 return AWS_IO_SOCKET_NO_ROUTE_TO_HOST;
108 case EADDRNOTAVAIL:
109 return AWS_IO_SOCKET_INVALID_ADDRESS;
110 case ENETDOWN:
111 return AWS_IO_SOCKET_NETWORK_DOWN;
112 case ECONNABORTED:
113 return AWS_IO_SOCKET_CONNECT_ABORTED;
114 case EADDRINUSE:
115 return AWS_IO_SOCKET_ADDRESS_IN_USE;
116 case ENOBUFS:
117 case ENOMEM:
118 return AWS_ERROR_OOM;
119 case EAGAIN:
120 return AWS_IO_READ_WOULD_BLOCK;
121 case EMFILE:
122 case ENFILE:
123 return AWS_ERROR_MAX_FDS_EXCEEDED;
124 case ENOENT:
125 case EINVAL:
126 return AWS_ERROR_FILE_INVALID_PATH;
127 case EAFNOSUPPORT:
128 return AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY;
129 case EACCES:
130 return AWS_ERROR_NO_PERMISSION;
131 default:
132 return AWS_IO_SOCKET_NOT_CONNECTED;
133 }
134 }
135
s_create_socket(struct aws_socket * sock,const struct aws_socket_options * options)136 static int s_create_socket(struct aws_socket *sock, const struct aws_socket_options *options) {
137
138 int fd = socket(s_convert_domain(options->domain), s_convert_type(options->type), 0);
139 AWS_LOGF_DEBUG(
140 AWS_LS_IO_SOCKET,
141 "id=%p fd=%d: initializing with domain %d and type %d",
142 (void *)sock,
143 fd,
144 options->domain,
145 options->type);
146 if (fd != -1) {
147 int flags = fcntl(fd, F_GETFL, 0);
148 flags |= O_NONBLOCK | O_CLOEXEC;
149 int success = fcntl(fd, F_SETFL, flags);
150 (void)success;
151 sock->io_handle.data.fd = fd;
152 sock->io_handle.additional_data = NULL;
153 return aws_socket_set_options(sock, options);
154 }
155
156 int aws_error = s_determine_socket_error(errno);
157 return aws_raise_error(aws_error);
158 }
159
160 struct posix_socket_connect_args {
161 struct aws_task task;
162 struct aws_allocator *allocator;
163 struct aws_socket *socket;
164 };
165
166 struct posix_socket {
167 struct aws_linked_list write_queue;
168 struct aws_linked_list written_queue;
169 struct aws_task written_task;
170 struct posix_socket_connect_args *connect_args;
171 /* Note that only the posix_socket impl part is refcounted.
172 * The public aws_socket can be a stack variable and cleaned up synchronously
173 * (by blocking until the event-loop cleans up the impl part).
174 * In hindsight, aws_socket should have been heap-allocated and refcounted, but alas */
175 struct aws_ref_count internal_refcount;
176 struct aws_allocator *allocator;
177 bool written_task_scheduled;
178 bool currently_subscribed;
179 bool continue_accept;
180 bool *close_happened;
181 };
182
s_socket_destroy_impl(void * user_data)183 static void s_socket_destroy_impl(void *user_data) {
184 struct posix_socket *socket_impl = user_data;
185 aws_mem_release(socket_impl->allocator, socket_impl);
186 }
187
s_socket_init(struct aws_socket * socket,struct aws_allocator * alloc,const struct aws_socket_options * options,int existing_socket_fd)188 static int s_socket_init(
189 struct aws_socket *socket,
190 struct aws_allocator *alloc,
191 const struct aws_socket_options *options,
192 int existing_socket_fd) {
193 AWS_ASSERT(options);
194 AWS_ZERO_STRUCT(*socket);
195
196 struct posix_socket *posix_socket = aws_mem_calloc(alloc, 1, sizeof(struct posix_socket));
197 if (!posix_socket) {
198 socket->impl = NULL;
199 return AWS_OP_ERR;
200 }
201
202 socket->allocator = alloc;
203 socket->io_handle.data.fd = -1;
204 socket->state = INIT;
205 socket->options = *options;
206
207 if (existing_socket_fd < 0) {
208 int err = s_create_socket(socket, options);
209 if (err) {
210 aws_mem_release(alloc, posix_socket);
211 socket->impl = NULL;
212 return AWS_OP_ERR;
213 }
214 } else {
215 socket->io_handle = (struct aws_io_handle){
216 .data = {.fd = existing_socket_fd},
217 .additional_data = NULL,
218 };
219 aws_socket_set_options(socket, options);
220 }
221
222 aws_linked_list_init(&posix_socket->write_queue);
223 aws_linked_list_init(&posix_socket->written_queue);
224 posix_socket->currently_subscribed = false;
225 posix_socket->continue_accept = false;
226 aws_ref_count_init(&posix_socket->internal_refcount, posix_socket, s_socket_destroy_impl);
227 posix_socket->allocator = alloc;
228 posix_socket->connect_args = NULL;
229 posix_socket->close_happened = NULL;
230 socket->impl = posix_socket;
231 return AWS_OP_SUCCESS;
232 }
233
aws_socket_init(struct aws_socket * socket,struct aws_allocator * alloc,const struct aws_socket_options * options)234 int aws_socket_init(struct aws_socket *socket, struct aws_allocator *alloc, const struct aws_socket_options *options) {
235 AWS_ASSERT(options);
236 return s_socket_init(socket, alloc, options, -1);
237 }
238
aws_socket_clean_up(struct aws_socket * socket)239 void aws_socket_clean_up(struct aws_socket *socket) {
240 if (!socket->impl) {
241 /* protect from double clean */
242 return;
243 }
244
245 int fd_for_logging = socket->io_handle.data.fd; /* socket's fd gets reset before final log */
246 (void)fd_for_logging;
247
248 if (aws_socket_is_open(socket)) {
249 AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: is still open, closing...", (void *)socket, fd_for_logging);
250 aws_socket_close(socket);
251 }
252 struct posix_socket *socket_impl = socket->impl;
253
254 if (aws_ref_count_release(&socket_impl->internal_refcount) != 0) {
255 AWS_LOGF_DEBUG(
256 AWS_LS_IO_SOCKET,
257 "id=%p fd=%d: is still pending io letting it dangle and cleaning up later.",
258 (void *)socket,
259 fd_for_logging);
260 }
261
262 AWS_ZERO_STRUCT(*socket);
263 socket->io_handle.data.fd = -1;
264 }
265
266 static void s_on_connection_error(struct aws_socket *socket, int error);
267
s_on_connection_success(struct aws_socket * socket)268 static int s_on_connection_success(struct aws_socket *socket) {
269
270 struct aws_event_loop *event_loop = socket->event_loop;
271 struct posix_socket *socket_impl = socket->impl;
272
273 if (socket_impl->currently_subscribed) {
274 aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle);
275 socket_impl->currently_subscribed = false;
276 }
277
278 socket->event_loop = NULL;
279
280 int connect_result;
281 socklen_t result_length = sizeof(connect_result);
282
283 if (getsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_ERROR, &connect_result, &result_length) < 0) {
284 AWS_LOGF_ERROR(
285 AWS_LS_IO_SOCKET,
286 "id=%p fd=%d: failed to determine connection error %d",
287 (void *)socket,
288 socket->io_handle.data.fd,
289 errno);
290 int aws_error = s_determine_socket_error(errno);
291 aws_raise_error(aws_error);
292 s_on_connection_error(socket, aws_error);
293 return AWS_OP_ERR;
294 }
295
296 if (connect_result) {
297 AWS_LOGF_ERROR(
298 AWS_LS_IO_SOCKET,
299 "id=%p fd=%d: connection error %d",
300 (void *)socket,
301 socket->io_handle.data.fd,
302 connect_result);
303 int aws_error = s_determine_socket_error(connect_result);
304 aws_raise_error(aws_error);
305 s_on_connection_error(socket, aws_error);
306 return AWS_OP_ERR;
307 }
308
309 AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p fd=%d: connection success", (void *)socket, socket->io_handle.data.fd);
310
311 struct sockaddr_storage address;
312 AWS_ZERO_STRUCT(address);
313 socklen_t address_size = sizeof(address);
314 if (!getsockname(socket->io_handle.data.fd, (struct sockaddr *)&address, &address_size)) {
315 uint16_t port = 0;
316
317 if (address.ss_family == AF_INET) {
318 struct sockaddr_in *s = (struct sockaddr_in *)&address;
319 port = ntohs(s->sin_port);
320 /* this comes straight from the kernal. a.) they won't fail. b.) even if they do, it's not fatal
321 * once we add logging, we can log this if it fails. */
322 if (inet_ntop(
323 AF_INET, &s->sin_addr, socket->local_endpoint.address, sizeof(socket->local_endpoint.address))) {
324 AWS_LOGF_DEBUG(
325 AWS_LS_IO_SOCKET,
326 "id=%p fd=%d: local endpoint %s:%d",
327 (void *)socket,
328 socket->io_handle.data.fd,
329 socket->local_endpoint.address,
330 port);
331 } else {
332 AWS_LOGF_WARN(
333 AWS_LS_IO_SOCKET,
334 "id=%p fd=%d: determining local endpoint failed",
335 (void *)socket,
336 socket->io_handle.data.fd);
337 }
338 } else if (address.ss_family == AF_INET6) {
339 struct sockaddr_in6 *s = (struct sockaddr_in6 *)&address;
340 port = ntohs(s->sin6_port);
341 /* this comes straight from the kernal. a.) they won't fail. b.) even if they do, it's not fatal
342 * once we add logging, we can log this if it fails. */
343 if (inet_ntop(
344 AF_INET6, &s->sin6_addr, socket->local_endpoint.address, sizeof(socket->local_endpoint.address))) {
345 AWS_LOGF_DEBUG(
346 AWS_LS_IO_SOCKET,
347 "id=%p fd %d: local endpoint %s:%d",
348 (void *)socket,
349 socket->io_handle.data.fd,
350 socket->local_endpoint.address,
351 port);
352 } else {
353 AWS_LOGF_WARN(
354 AWS_LS_IO_SOCKET,
355 "id=%p fd=%d: determining local endpoint failed",
356 (void *)socket,
357 socket->io_handle.data.fd);
358 }
359 }
360
361 socket->local_endpoint.port = port;
362 } else {
363 AWS_LOGF_ERROR(
364 AWS_LS_IO_SOCKET,
365 "id=%p fd=%d: getsockname() failed with error %d",
366 (void *)socket,
367 socket->io_handle.data.fd,
368 errno);
369 int aws_error = s_determine_socket_error(errno);
370 aws_raise_error(aws_error);
371 s_on_connection_error(socket, aws_error);
372 return AWS_OP_ERR;
373 }
374
375 socket->state = CONNECTED_WRITE | CONNECTED_READ;
376
377 if (aws_socket_assign_to_event_loop(socket, event_loop)) {
378 AWS_LOGF_ERROR(
379 AWS_LS_IO_SOCKET,
380 "id=%p fd=%d: assignment to event loop %p failed with error %d",
381 (void *)socket,
382 socket->io_handle.data.fd,
383 (void *)event_loop,
384 aws_last_error());
385 s_on_connection_error(socket, aws_last_error());
386 return AWS_OP_ERR;
387 }
388
389 socket->connection_result_fn(socket, AWS_ERROR_SUCCESS, socket->connect_accept_user_data);
390
391 return AWS_OP_SUCCESS;
392 }
393
s_on_connection_error(struct aws_socket * socket,int error)394 static void s_on_connection_error(struct aws_socket *socket, int error) {
395 socket->state = ERROR;
396 AWS_LOGF_ERROR(AWS_LS_IO_SOCKET, "id=%p fd=%d: connection failure", (void *)socket, socket->io_handle.data.fd);
397 if (socket->connection_result_fn) {
398 socket->connection_result_fn(socket, error, socket->connect_accept_user_data);
399 } else if (socket->accept_result_fn) {
400 socket->accept_result_fn(socket, error, NULL, socket->connect_accept_user_data);
401 }
402 }
403
404 /* the next two callbacks compete based on which one runs first. if s_socket_connect_event
405 * comes back first, then we set socket_args->socket = NULL and continue on with the connection.
406 * if s_handle_socket_timeout() runs first, is sees socket_args->socket is NULL and just cleans up its memory.
407 * s_handle_socket_timeout() will always run so the memory for socket_connect_args is always cleaned up there. */
s_socket_connect_event(struct aws_event_loop * event_loop,struct aws_io_handle * handle,int events,void * user_data)408 static void s_socket_connect_event(
409 struct aws_event_loop *event_loop,
410 struct aws_io_handle *handle,
411 int events,
412 void *user_data) {
413
414 (void)event_loop;
415 (void)handle;
416
417 struct posix_socket_connect_args *socket_args = (struct posix_socket_connect_args *)user_data;
418 AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "fd=%d: connection activity handler triggered ", handle->data.fd);
419
420 if (socket_args->socket) {
421 AWS_LOGF_TRACE(
422 AWS_LS_IO_SOCKET,
423 "id=%p fd=%d: has not timed out yet proceeding with connection.",
424 (void *)socket_args->socket,
425 handle->data.fd);
426
427 struct posix_socket *socket_impl = socket_args->socket->impl;
428 if (!(events & AWS_IO_EVENT_TYPE_ERROR || events & AWS_IO_EVENT_TYPE_CLOSED) &&
429 (events & AWS_IO_EVENT_TYPE_READABLE || events & AWS_IO_EVENT_TYPE_WRITABLE)) {
430 struct aws_socket *socket = socket_args->socket;
431 socket_args->socket = NULL;
432 socket_impl->connect_args = NULL;
433 s_on_connection_success(socket);
434 return;
435 }
436
437 int aws_error = aws_socket_get_error(socket_args->socket);
438 /* we'll get another notification. */
439 if (aws_error == AWS_IO_READ_WOULD_BLOCK) {
440 AWS_LOGF_TRACE(
441 AWS_LS_IO_SOCKET,
442 "id=%p fd=%d: spurious event, waiting for another notification.",
443 (void *)socket_args->socket,
444 handle->data.fd);
445 return;
446 }
447
448 struct aws_socket *socket = socket_args->socket;
449 socket_args->socket = NULL;
450 socket_impl->connect_args = NULL;
451 aws_raise_error(aws_error);
452 s_on_connection_error(socket, aws_error);
453 }
454 }
455
s_handle_socket_timeout(struct aws_task * task,void * args,aws_task_status status)456 static void s_handle_socket_timeout(struct aws_task *task, void *args, aws_task_status status) {
457 (void)task;
458 (void)status;
459
460 struct posix_socket_connect_args *socket_args = args;
461
462 AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "task_id=%p: timeout task triggered, evaluating timeouts.", (void *)task);
463 /* successful connection will have nulled out connect_args->socket */
464 if (socket_args->socket) {
465 AWS_LOGF_ERROR(
466 AWS_LS_IO_SOCKET,
467 "id=%p fd=%d: timed out, shutting down.",
468 (void *)socket_args->socket,
469 socket_args->socket->io_handle.data.fd);
470
471 socket_args->socket->state = TIMEDOUT;
472 int error_code = AWS_IO_SOCKET_TIMEOUT;
473
474 if (status == AWS_TASK_STATUS_RUN_READY) {
475 aws_event_loop_unsubscribe_from_io_events(socket_args->socket->event_loop, &socket_args->socket->io_handle);
476 } else {
477 error_code = AWS_IO_EVENT_LOOP_SHUTDOWN;
478 aws_event_loop_free_io_event_resources(socket_args->socket->event_loop, &socket_args->socket->io_handle);
479 }
480 socket_args->socket->event_loop = NULL;
481 struct posix_socket *socket_impl = socket_args->socket->impl;
482 socket_impl->currently_subscribed = false;
483 aws_raise_error(error_code);
484 struct aws_socket *socket = socket_args->socket;
485 /*socket close sets socket_args->socket to NULL and
486 * socket_impl->connect_args to NULL. */
487 aws_socket_close(socket);
488 s_on_connection_error(socket, error_code);
489 }
490
491 aws_mem_release(socket_args->allocator, socket_args);
492 }
493
494 /* this is used simply for moving a connect_success callback when the connect finished immediately
495 * (like for unix domain sockets) into the event loop's thread. Also note, in that case there was no
496 * timeout task scheduled, so in this case the socket_args are cleaned up. */
s_run_connect_success(struct aws_task * task,void * arg,enum aws_task_status status)497 static void s_run_connect_success(struct aws_task *task, void *arg, enum aws_task_status status) {
498 (void)task;
499 struct posix_socket_connect_args *socket_args = arg;
500
501 if (socket_args->socket) {
502 struct posix_socket *socket_impl = socket_args->socket->impl;
503 if (status == AWS_TASK_STATUS_RUN_READY) {
504 s_on_connection_success(socket_args->socket);
505 } else {
506 aws_raise_error(AWS_IO_SOCKET_CONNECT_ABORTED);
507 socket_args->socket->event_loop = NULL;
508 s_on_connection_error(socket_args->socket, AWS_IO_SOCKET_CONNECT_ABORTED);
509 }
510 socket_impl->connect_args = NULL;
511 }
512
513 aws_mem_release(socket_args->allocator, socket_args);
514 }
515
s_convert_pton_error(int pton_code)516 static inline int s_convert_pton_error(int pton_code) {
517 if (pton_code == 0) {
518 return AWS_IO_SOCKET_INVALID_ADDRESS;
519 }
520
521 return s_determine_socket_error(errno);
522 }
523
524 struct socket_address {
525 union sock_addr_types {
526 struct sockaddr_in addr_in;
527 struct sockaddr_in6 addr_in6;
528 struct sockaddr_un un_addr;
529 #ifdef USE_VSOCK
530 struct sockaddr_vm vm_addr;
531 #endif
532 } sock_addr_types;
533 };
534
535 #ifdef USE_VSOCK
536 /** Convert a string to a VSOCK CID. Respects the calling convetion of inet_pton:
537 * 0 on error, 1 on success. */
parse_cid(const char * cid_str,unsigned int * value)538 static int parse_cid(const char *cid_str, unsigned int *value) {
539 if (cid_str == NULL || value == NULL) {
540 errno = EINVAL;
541 return 0;
542 }
543 /* strtoll returns 0 as both error and correct value */
544 errno = 0;
545 /* unsigned long long to handle edge cases in convention explicitly */
546 long long cid = strtoll(cid_str, NULL, 10);
547 if (errno != 0) {
548 return 0;
549 }
550
551 /* -1U means any, so it's a valid value, but it needs to be converted to
552 * unsigned int. */
553 if (cid == -1) {
554 *value = VMADDR_CID_ANY;
555 return 1;
556 }
557
558 if (cid < 0 || cid > UINT_MAX) {
559 errno = ERANGE;
560 return 0;
561 }
562
563 /* cast is safe here, edge cases already checked */
564 *value = (unsigned int)cid;
565 return 1;
566 }
567 #endif
568
aws_socket_connect(struct aws_socket * socket,const struct aws_socket_endpoint * remote_endpoint,struct aws_event_loop * event_loop,aws_socket_on_connection_result_fn * on_connection_result,void * user_data)569 int aws_socket_connect(
570 struct aws_socket *socket,
571 const struct aws_socket_endpoint *remote_endpoint,
572 struct aws_event_loop *event_loop,
573 aws_socket_on_connection_result_fn *on_connection_result,
574 void *user_data) {
575 AWS_ASSERT(event_loop);
576 AWS_ASSERT(!socket->event_loop);
577
578 AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: beginning connect.", (void *)socket, socket->io_handle.data.fd);
579
580 if (socket->event_loop) {
581 return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED);
582 }
583
584 if (socket->options.type != AWS_SOCKET_DGRAM) {
585 AWS_ASSERT(on_connection_result);
586 if (socket->state != INIT) {
587 return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
588 }
589 } else { /* UDP socket */
590 /* UDP sockets jump to CONNECT_READ if bind is called first */
591 if (socket->state != CONNECTED_READ && socket->state != INIT) {
592 return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
593 }
594 }
595
596 size_t address_strlen;
597 if (aws_secure_strlen(remote_endpoint->address, AWS_ADDRESS_MAX_LEN, &address_strlen)) {
598 return AWS_OP_ERR;
599 }
600
601 struct socket_address address;
602 AWS_ZERO_STRUCT(address);
603 socklen_t sock_size = 0;
604 int pton_err = 1;
605 if (socket->options.domain == AWS_SOCKET_IPV4) {
606 pton_err = inet_pton(AF_INET, remote_endpoint->address, &address.sock_addr_types.addr_in.sin_addr);
607 address.sock_addr_types.addr_in.sin_port = htons(remote_endpoint->port);
608 address.sock_addr_types.addr_in.sin_family = AF_INET;
609 sock_size = sizeof(address.sock_addr_types.addr_in);
610 } else if (socket->options.domain == AWS_SOCKET_IPV6) {
611 pton_err = inet_pton(AF_INET6, remote_endpoint->address, &address.sock_addr_types.addr_in6.sin6_addr);
612 address.sock_addr_types.addr_in6.sin6_port = htons(remote_endpoint->port);
613 address.sock_addr_types.addr_in6.sin6_family = AF_INET6;
614 sock_size = sizeof(address.sock_addr_types.addr_in6);
615 } else if (socket->options.domain == AWS_SOCKET_LOCAL) {
616 address.sock_addr_types.un_addr.sun_family = AF_UNIX;
617 strncpy(address.sock_addr_types.un_addr.sun_path, remote_endpoint->address, AWS_ADDRESS_MAX_LEN);
618 sock_size = sizeof(address.sock_addr_types.un_addr);
619 #ifdef USE_VSOCK
620 } else if (socket->options.domain == AWS_SOCKET_VSOCK) {
621 pton_err = parse_cid(remote_endpoint->address, &address.sock_addr_types.vm_addr.svm_cid);
622 address.sock_addr_types.vm_addr.svm_family = AF_VSOCK;
623 address.sock_addr_types.vm_addr.svm_port = (unsigned int)remote_endpoint->port;
624 sock_size = sizeof(address.sock_addr_types.vm_addr);
625 #endif
626 } else {
627 AWS_ASSERT(0);
628 return aws_raise_error(AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY);
629 }
630
631 if (pton_err != 1) {
632 AWS_LOGF_ERROR(
633 AWS_LS_IO_SOCKET,
634 "id=%p fd=%d: failed to parse address %s:%d.",
635 (void *)socket,
636 socket->io_handle.data.fd,
637 remote_endpoint->address,
638 (int)remote_endpoint->port);
639 return aws_raise_error(s_convert_pton_error(pton_err));
640 }
641
642 AWS_LOGF_DEBUG(
643 AWS_LS_IO_SOCKET,
644 "id=%p fd=%d: connecting to endpoint %s:%d.",
645 (void *)socket,
646 socket->io_handle.data.fd,
647 remote_endpoint->address,
648 (int)remote_endpoint->port);
649
650 socket->state = CONNECTING;
651 socket->remote_endpoint = *remote_endpoint;
652 socket->connect_accept_user_data = user_data;
653 socket->connection_result_fn = on_connection_result;
654
655 struct posix_socket *socket_impl = socket->impl;
656
657 socket_impl->connect_args = aws_mem_calloc(socket->allocator, 1, sizeof(struct posix_socket_connect_args));
658 if (!socket_impl->connect_args) {
659 return AWS_OP_ERR;
660 }
661
662 socket_impl->connect_args->socket = socket;
663 socket_impl->connect_args->allocator = socket->allocator;
664
665 socket_impl->connect_args->task.fn = s_handle_socket_timeout;
666 socket_impl->connect_args->task.arg = socket_impl->connect_args;
667
668 int error_code = connect(socket->io_handle.data.fd, (struct sockaddr *)&address.sock_addr_types, sock_size);
669 socket->event_loop = event_loop;
670
671 if (!error_code) {
672 AWS_LOGF_INFO(
673 AWS_LS_IO_SOCKET,
674 "id=%p fd=%d: connected immediately, not scheduling timeout.",
675 (void *)socket,
676 socket->io_handle.data.fd);
677 socket_impl->connect_args->task.fn = s_run_connect_success;
678 /* the subscription for IO will happen once we setup the connection in the task. Since we already
679 * know the connection succeeded, we don't need to register for events yet. */
680 aws_event_loop_schedule_task_now(event_loop, &socket_impl->connect_args->task);
681 }
682
683 if (error_code) {
684 error_code = errno;
685 if (error_code == EINPROGRESS || error_code == EALREADY) {
686 AWS_LOGF_TRACE(
687 AWS_LS_IO_SOCKET,
688 "id=%p fd=%d: connection pending waiting on event-loop notification or timeout.",
689 (void *)socket,
690 socket->io_handle.data.fd);
691 /* cache the timeout task; it is possible for the IO subscription to come back virtually immediately
692 * and null out the connect args */
693 struct aws_task *timeout_task = &socket_impl->connect_args->task;
694
695 socket_impl->currently_subscribed = true;
696 /* This event is for when the connection finishes. (the fd will flip writable). */
697 if (aws_event_loop_subscribe_to_io_events(
698 event_loop,
699 &socket->io_handle,
700 AWS_IO_EVENT_TYPE_WRITABLE,
701 s_socket_connect_event,
702 socket_impl->connect_args)) {
703 AWS_LOGF_ERROR(
704 AWS_LS_IO_SOCKET,
705 "id=%p fd=%d: failed to register with event-loop %p.",
706 (void *)socket,
707 socket->io_handle.data.fd,
708 (void *)event_loop);
709 socket_impl->currently_subscribed = false;
710 socket->event_loop = NULL;
711 goto err_clean_up;
712 }
713
714 /* schedule a task to run at the connect timeout interval, if this task runs before the connect
715 * happens, we consider that a timeout. */
716 uint64_t timeout = 0;
717 aws_event_loop_current_clock_time(event_loop, &timeout);
718 timeout += aws_timestamp_convert(
719 socket->options.connect_timeout_ms, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL);
720 AWS_LOGF_TRACE(
721 AWS_LS_IO_SOCKET,
722 "id=%p fd=%d: scheduling timeout task for %llu.",
723 (void *)socket,
724 socket->io_handle.data.fd,
725 (unsigned long long)timeout);
726 aws_event_loop_schedule_task_future(event_loop, timeout_task, timeout);
727 } else {
728 AWS_LOGF_ERROR(
729 AWS_LS_IO_SOCKET,
730 "id=%p fd=%d: connect failed with error code %d.",
731 (void *)socket,
732 socket->io_handle.data.fd,
733 error_code);
734 int aws_error = s_determine_socket_error(error_code);
735 aws_raise_error(aws_error);
736 socket->event_loop = NULL;
737 socket_impl->currently_subscribed = false;
738 goto err_clean_up;
739 }
740 }
741 return AWS_OP_SUCCESS;
742
743 err_clean_up:
744 aws_mem_release(socket->allocator, socket_impl->connect_args);
745 socket_impl->connect_args = NULL;
746 return AWS_OP_ERR;
747 }
748
aws_socket_bind(struct aws_socket * socket,const struct aws_socket_endpoint * local_endpoint)749 int aws_socket_bind(struct aws_socket *socket, const struct aws_socket_endpoint *local_endpoint) {
750 if (socket->state != INIT) {
751 AWS_LOGF_ERROR(
752 AWS_LS_IO_SOCKET,
753 "id=%p fd=%d: invalid state for bind operation.",
754 (void *)socket,
755 socket->io_handle.data.fd);
756 return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
757 }
758
759 size_t address_strlen;
760 if (aws_secure_strlen(local_endpoint->address, AWS_ADDRESS_MAX_LEN, &address_strlen)) {
761 return AWS_OP_ERR;
762 }
763
764 int error_code = -1;
765
766 socket->local_endpoint = *local_endpoint;
767 AWS_LOGF_INFO(
768 AWS_LS_IO_SOCKET,
769 "id=%p fd=%d: binding to %s:%d.",
770 (void *)socket,
771 socket->io_handle.data.fd,
772 local_endpoint->address,
773 (int)local_endpoint->port);
774
775 struct socket_address address;
776 AWS_ZERO_STRUCT(address);
777 socklen_t sock_size = 0;
778 int pton_err = 1;
779 if (socket->options.domain == AWS_SOCKET_IPV4) {
780 pton_err = inet_pton(AF_INET, local_endpoint->address, &address.sock_addr_types.addr_in.sin_addr);
781 address.sock_addr_types.addr_in.sin_port = htons(local_endpoint->port);
782 address.sock_addr_types.addr_in.sin_family = AF_INET;
783 sock_size = sizeof(address.sock_addr_types.addr_in);
784 } else if (socket->options.domain == AWS_SOCKET_IPV6) {
785 pton_err = inet_pton(AF_INET6, local_endpoint->address, &address.sock_addr_types.addr_in6.sin6_addr);
786 address.sock_addr_types.addr_in6.sin6_port = htons(local_endpoint->port);
787 address.sock_addr_types.addr_in6.sin6_family = AF_INET6;
788 sock_size = sizeof(address.sock_addr_types.addr_in6);
789 } else if (socket->options.domain == AWS_SOCKET_LOCAL) {
790 address.sock_addr_types.un_addr.sun_family = AF_UNIX;
791 strncpy(address.sock_addr_types.un_addr.sun_path, local_endpoint->address, AWS_ADDRESS_MAX_LEN);
792 sock_size = sizeof(address.sock_addr_types.un_addr);
793 #ifdef USE_VSOCK
794 } else if (socket->options.domain == AWS_SOCKET_VSOCK) {
795 pton_err = parse_cid(local_endpoint->address, &address.sock_addr_types.vm_addr.svm_cid);
796 address.sock_addr_types.vm_addr.svm_family = AF_VSOCK;
797 address.sock_addr_types.vm_addr.svm_port = (unsigned int)local_endpoint->port;
798 sock_size = sizeof(address.sock_addr_types.vm_addr);
799 #endif
800 } else {
801 AWS_ASSERT(0);
802 return aws_raise_error(AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY);
803 }
804
805 if (pton_err != 1) {
806 AWS_LOGF_ERROR(
807 AWS_LS_IO_SOCKET,
808 "id=%p fd=%d: failed to parse address %s:%d.",
809 (void *)socket,
810 socket->io_handle.data.fd,
811 local_endpoint->address,
812 (int)local_endpoint->port);
813 return aws_raise_error(s_convert_pton_error(pton_err));
814 }
815
816 error_code = bind(socket->io_handle.data.fd, (struct sockaddr *)&address.sock_addr_types, sock_size);
817
818 if (!error_code) {
819 if (socket->options.type == AWS_SOCKET_STREAM) {
820 socket->state = BOUND;
821 } else {
822 /* e.g. UDP is now readable */
823 socket->state = CONNECTED_READ;
824 }
825 AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: successfully bound", (void *)socket, socket->io_handle.data.fd);
826
827 return AWS_OP_SUCCESS;
828 }
829
830 socket->state = ERROR;
831 error_code = errno;
832 AWS_LOGF_ERROR(
833 AWS_LS_IO_SOCKET,
834 "id=%p fd=%d: bind failed with error code %d",
835 (void *)socket,
836 socket->io_handle.data.fd,
837 error_code);
838
839 int aws_error = s_determine_socket_error(error_code);
840 return aws_raise_error(aws_error);
841 }
842
aws_socket_listen(struct aws_socket * socket,int backlog_size)843 int aws_socket_listen(struct aws_socket *socket, int backlog_size) {
844 if (socket->state != BOUND) {
845 AWS_LOGF_ERROR(
846 AWS_LS_IO_SOCKET,
847 "id=%p fd=%d: invalid state for listen operation. You must call bind first.",
848 (void *)socket,
849 socket->io_handle.data.fd);
850 return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
851 }
852
853 int error_code = listen(socket->io_handle.data.fd, backlog_size);
854
855 if (!error_code) {
856 AWS_LOGF_INFO(
857 AWS_LS_IO_SOCKET, "id=%p fd=%d: successfully listening", (void *)socket, socket->io_handle.data.fd);
858 socket->state = LISTENING;
859 return AWS_OP_SUCCESS;
860 }
861
862 AWS_LOGF_ERROR(
863 AWS_LS_IO_SOCKET,
864 "id=%p fd=%d: listen failed with error code %d",
865 (void *)socket,
866 socket->io_handle.data.fd,
867 error_code);
868 error_code = errno;
869 socket->state = ERROR;
870
871 return aws_raise_error(s_determine_socket_error(error_code));
872 }
873
874 /* this is called by the event loop handler that was installed in start_accept(). It runs once the FD goes readable,
875 * accepts as many as it can and then returns control to the event loop. */
s_socket_accept_event(struct aws_event_loop * event_loop,struct aws_io_handle * handle,int events,void * user_data)876 static void s_socket_accept_event(
877 struct aws_event_loop *event_loop,
878 struct aws_io_handle *handle,
879 int events,
880 void *user_data) {
881
882 (void)event_loop;
883
884 struct aws_socket *socket = user_data;
885 struct posix_socket *socket_impl = socket->impl;
886
887 AWS_LOGF_DEBUG(
888 AWS_LS_IO_SOCKET, "id=%p fd=%d: listening event received", (void *)socket, socket->io_handle.data.fd);
889
890 if (socket_impl->continue_accept && events & AWS_IO_EVENT_TYPE_READABLE) {
891 int in_fd = 0;
892 while (socket_impl->continue_accept && in_fd != -1) {
893 struct sockaddr_storage in_addr;
894 socklen_t in_len = sizeof(struct sockaddr_storage);
895
896 in_fd = accept(handle->data.fd, (struct sockaddr *)&in_addr, &in_len);
897 if (in_fd == -1) {
898 int error = errno;
899
900 if (error == EAGAIN || error == EWOULDBLOCK) {
901 break;
902 }
903
904 int aws_error = aws_socket_get_error(socket);
905 aws_raise_error(aws_error);
906 s_on_connection_error(socket, aws_error);
907 break;
908 }
909
910 AWS_LOGF_DEBUG(
911 AWS_LS_IO_SOCKET, "id=%p fd=%d: incoming connection", (void *)socket, socket->io_handle.data.fd);
912
913 struct aws_socket *new_sock = aws_mem_acquire(socket->allocator, sizeof(struct aws_socket));
914
915 if (!new_sock) {
916 close(in_fd);
917 s_on_connection_error(socket, aws_last_error());
918 continue;
919 }
920
921 if (s_socket_init(new_sock, socket->allocator, &socket->options, in_fd)) {
922 aws_mem_release(socket->allocator, new_sock);
923 s_on_connection_error(socket, aws_last_error());
924 continue;
925 }
926
927 new_sock->local_endpoint = socket->local_endpoint;
928 new_sock->state = CONNECTED_READ | CONNECTED_WRITE;
929 uint16_t port = 0;
930
931 /* get the info on the incoming socket's address */
932 if (in_addr.ss_family == AF_INET) {
933 struct sockaddr_in *s = (struct sockaddr_in *)&in_addr;
934 port = ntohs(s->sin_port);
935 /* this came from the kernel, a.) it won't fail. b.) even if it does
936 * its not fatal. come back and add logging later. */
937 if (!inet_ntop(
938 AF_INET,
939 &s->sin_addr,
940 new_sock->remote_endpoint.address,
941 sizeof(new_sock->remote_endpoint.address))) {
942 AWS_LOGF_WARN(
943 AWS_LS_IO_SOCKET,
944 "id=%p fd=%d:. Failed to determine remote address.",
945 (void *)socket,
946 socket->io_handle.data.fd)
947 }
948 new_sock->options.domain = AWS_SOCKET_IPV4;
949 } else if (in_addr.ss_family == AF_INET6) {
950 /* this came from the kernel, a.) it won't fail. b.) even if it does
951 * its not fatal. come back and add logging later. */
952 struct sockaddr_in6 *s = (struct sockaddr_in6 *)&in_addr;
953 port = ntohs(s->sin6_port);
954 if (!inet_ntop(
955 AF_INET6,
956 &s->sin6_addr,
957 new_sock->remote_endpoint.address,
958 sizeof(new_sock->remote_endpoint.address))) {
959 AWS_LOGF_WARN(
960 AWS_LS_IO_SOCKET,
961 "id=%p fd=%d:. Failed to determine remote address.",
962 (void *)socket,
963 socket->io_handle.data.fd)
964 }
965 new_sock->options.domain = AWS_SOCKET_IPV6;
966 } else if (in_addr.ss_family == AF_UNIX) {
967 new_sock->remote_endpoint = socket->local_endpoint;
968 new_sock->options.domain = AWS_SOCKET_LOCAL;
969 }
970
971 new_sock->remote_endpoint.port = port;
972
973 AWS_LOGF_INFO(
974 AWS_LS_IO_SOCKET,
975 "id=%p fd=%d: connected to %s:%d, incoming fd %d",
976 (void *)socket,
977 socket->io_handle.data.fd,
978 new_sock->remote_endpoint.address,
979 new_sock->remote_endpoint.port,
980 in_fd);
981
982 int flags = fcntl(in_fd, F_GETFL, 0);
983
984 flags |= O_NONBLOCK | O_CLOEXEC;
985 fcntl(in_fd, F_SETFL, flags);
986
987 bool close_occurred = false;
988 socket_impl->close_happened = &close_occurred;
989 socket->accept_result_fn(socket, AWS_ERROR_SUCCESS, new_sock, socket->connect_accept_user_data);
990
991 if (close_occurred) {
992 return;
993 }
994
995 socket_impl->close_happened = NULL;
996 }
997 }
998
999 AWS_LOGF_TRACE(
1000 AWS_LS_IO_SOCKET,
1001 "id=%p fd=%d: finished processing incoming connections, "
1002 "waiting on event-loop notification",
1003 (void *)socket,
1004 socket->io_handle.data.fd);
1005 }
1006
aws_socket_start_accept(struct aws_socket * socket,struct aws_event_loop * accept_loop,aws_socket_on_accept_result_fn * on_accept_result,void * user_data)1007 int aws_socket_start_accept(
1008 struct aws_socket *socket,
1009 struct aws_event_loop *accept_loop,
1010 aws_socket_on_accept_result_fn *on_accept_result,
1011 void *user_data) {
1012 AWS_ASSERT(on_accept_result);
1013 AWS_ASSERT(accept_loop);
1014
1015 if (socket->event_loop) {
1016 AWS_LOGF_ERROR(
1017 AWS_LS_IO_SOCKET,
1018 "id=%p fd=%d: is already assigned to event-loop %p.",
1019 (void *)socket,
1020 socket->io_handle.data.fd,
1021 (void *)socket->event_loop);
1022 return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED);
1023 }
1024
1025 if (socket->state != LISTENING) {
1026 AWS_LOGF_ERROR(
1027 AWS_LS_IO_SOCKET,
1028 "id=%p fd=%d: invalid state for start_accept operation. You must call listen first.",
1029 (void *)socket,
1030 socket->io_handle.data.fd);
1031 return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
1032 }
1033
1034 socket->accept_result_fn = on_accept_result;
1035 socket->connect_accept_user_data = user_data;
1036 socket->event_loop = accept_loop;
1037 struct posix_socket *socket_impl = socket->impl;
1038 socket_impl->continue_accept = true;
1039 socket_impl->currently_subscribed = true;
1040
1041 if (aws_event_loop_subscribe_to_io_events(
1042 socket->event_loop, &socket->io_handle, AWS_IO_EVENT_TYPE_READABLE, s_socket_accept_event, socket)) {
1043 AWS_LOGF_ERROR(
1044 AWS_LS_IO_SOCKET,
1045 "id=%p fd=%d: failed to subscribe to event-loop %p.",
1046 (void *)socket,
1047 socket->io_handle.data.fd,
1048 (void *)socket->event_loop);
1049 socket_impl->continue_accept = false;
1050 socket_impl->currently_subscribed = false;
1051 socket->event_loop = NULL;
1052
1053 return AWS_OP_ERR;
1054 }
1055
1056 return AWS_OP_SUCCESS;
1057 }
1058
1059 struct stop_accept_args {
1060 struct aws_task task;
1061 struct aws_mutex mutex;
1062 struct aws_condition_variable condition_variable;
1063 struct aws_socket *socket;
1064 int ret_code;
1065 bool invoked;
1066 };
1067
s_stop_accept_pred(void * arg)1068 static bool s_stop_accept_pred(void *arg) {
1069 struct stop_accept_args *stop_accept_args = arg;
1070 return stop_accept_args->invoked;
1071 }
1072
s_stop_accept_task(struct aws_task * task,void * arg,enum aws_task_status status)1073 static void s_stop_accept_task(struct aws_task *task, void *arg, enum aws_task_status status) {
1074 (void)task;
1075 (void)status;
1076
1077 struct stop_accept_args *stop_accept_args = arg;
1078 aws_mutex_lock(&stop_accept_args->mutex);
1079 stop_accept_args->ret_code = AWS_OP_SUCCESS;
1080 if (aws_socket_stop_accept(stop_accept_args->socket)) {
1081 stop_accept_args->ret_code = aws_last_error();
1082 }
1083 stop_accept_args->invoked = true;
1084 aws_condition_variable_notify_one(&stop_accept_args->condition_variable);
1085 aws_mutex_unlock(&stop_accept_args->mutex);
1086 }
1087
aws_socket_stop_accept(struct aws_socket * socket)1088 int aws_socket_stop_accept(struct aws_socket *socket) {
1089 if (socket->state != LISTENING) {
1090 AWS_LOGF_ERROR(
1091 AWS_LS_IO_SOCKET,
1092 "id=%p fd=%d: is not in a listening state, can't stop_accept.",
1093 (void *)socket,
1094 socket->io_handle.data.fd);
1095 return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
1096 }
1097
1098 AWS_LOGF_INFO(
1099 AWS_LS_IO_SOCKET, "id=%p fd=%d: stopping accepting new connections", (void *)socket, socket->io_handle.data.fd);
1100
1101 if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) {
1102 struct stop_accept_args args = {
1103 .mutex = AWS_MUTEX_INIT,
1104 .condition_variable = AWS_CONDITION_VARIABLE_INIT,
1105 .invoked = false,
1106 .socket = socket,
1107 .ret_code = AWS_OP_SUCCESS,
1108 .task = {.fn = s_stop_accept_task},
1109 };
1110 AWS_LOGF_INFO(
1111 AWS_LS_IO_SOCKET,
1112 "id=%p fd=%d: stopping accepting new connections from a different thread than "
1113 "the socket is running from. Blocking until it shuts down.",
1114 (void *)socket,
1115 socket->io_handle.data.fd);
1116 /* Look.... I know what I'm doing.... trust me, I'm an engineer.
1117 * We wait on the completion before 'args' goes out of scope.
1118 * NOLINTNEXTLINE */
1119 args.task.arg = &args;
1120 aws_mutex_lock(&args.mutex);
1121 aws_event_loop_schedule_task_now(socket->event_loop, &args.task);
1122 aws_condition_variable_wait_pred(&args.condition_variable, &args.mutex, s_stop_accept_pred, &args);
1123 aws_mutex_unlock(&args.mutex);
1124 AWS_LOGF_INFO(
1125 AWS_LS_IO_SOCKET,
1126 "id=%p fd=%d: stop accept task finished running.",
1127 (void *)socket,
1128 socket->io_handle.data.fd);
1129
1130 if (args.ret_code) {
1131 return aws_raise_error(args.ret_code);
1132 }
1133 return AWS_OP_SUCCESS;
1134 }
1135
1136 int ret_val = AWS_OP_SUCCESS;
1137 struct posix_socket *socket_impl = socket->impl;
1138 if (socket_impl->currently_subscribed) {
1139 ret_val = aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle);
1140 socket_impl->currently_subscribed = false;
1141 socket_impl->continue_accept = false;
1142 socket->event_loop = NULL;
1143 }
1144
1145 return ret_val;
1146 }
1147
aws_socket_set_options(struct aws_socket * socket,const struct aws_socket_options * options)1148 int aws_socket_set_options(struct aws_socket *socket, const struct aws_socket_options *options) {
1149 if (socket->options.domain != options->domain || socket->options.type != options->type) {
1150 return aws_raise_error(AWS_IO_SOCKET_INVALID_OPTIONS);
1151 }
1152
1153 AWS_LOGF_DEBUG(
1154 AWS_LS_IO_SOCKET,
1155 "id=%p fd=%d: setting socket options to: keep-alive %d, keep idle %d, keep-alive interval %d, keep-alive probe "
1156 "count %d.",
1157 (void *)socket,
1158 socket->io_handle.data.fd,
1159 (int)options->keepalive,
1160 (int)options->keep_alive_timeout_sec,
1161 (int)options->keep_alive_interval_sec,
1162 (int)options->keep_alive_max_failed_probes);
1163
1164 socket->options = *options;
1165
1166 #ifdef NO_SIGNAL_SOCK_OPT
1167 int option_value = 1;
1168 if (AWS_UNLIKELY(setsockopt(
1169 socket->io_handle.data.fd, SOL_SOCKET, NO_SIGNAL_SOCK_OPT, &option_value, sizeof(option_value)))) {
1170 AWS_LOGF_WARN(
1171 AWS_LS_IO_SOCKET,
1172 "id=%p fd=%d: setsockopt() for NO_SIGNAL_SOCK_OPT failed with errno %d.",
1173 (void *)socket,
1174 socket->io_handle.data.fd,
1175 errno);
1176 }
1177 #endif /* NO_SIGNAL_SOCK_OPT */
1178
1179 int reuse = 1;
1180 if (AWS_UNLIKELY(setsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(int)))) {
1181 AWS_LOGF_WARN(
1182 AWS_LS_IO_SOCKET,
1183 "id=%p fd=%d: setsockopt() for SO_REUSEADDR failed with errno %d.",
1184 (void *)socket,
1185 socket->io_handle.data.fd,
1186 errno);
1187 }
1188
1189 if (options->type == AWS_SOCKET_STREAM && options->domain != AWS_SOCKET_LOCAL) {
1190 if (socket->options.keepalive) {
1191 int keep_alive = 1;
1192 if (AWS_UNLIKELY(
1193 setsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_KEEPALIVE, &keep_alive, sizeof(int)))) {
1194 AWS_LOGF_WARN(
1195 AWS_LS_IO_SOCKET,
1196 "id=%p fd=%d: setsockopt() for enabling SO_KEEPALIVE failed with errno %d.",
1197 (void *)socket,
1198 socket->io_handle.data.fd,
1199 errno);
1200 }
1201 }
1202
1203 if (socket->options.keep_alive_interval_sec && socket->options.keep_alive_timeout_sec) {
1204 int ival_in_secs = socket->options.keep_alive_interval_sec;
1205 if (AWS_UNLIKELY(setsockopt(
1206 socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPIDLE, &ival_in_secs, sizeof(ival_in_secs)))) {
1207 AWS_LOGF_WARN(
1208 AWS_LS_IO_SOCKET,
1209 "id=%p fd=%d: setsockopt() for enabling TCP_KEEPIDLE for TCP failed with errno %d.",
1210 (void *)socket,
1211 socket->io_handle.data.fd,
1212 errno);
1213 }
1214
1215 ival_in_secs = socket->options.keep_alive_timeout_sec;
1216 if (AWS_UNLIKELY(setsockopt(
1217 socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPINTVL, &ival_in_secs, sizeof(ival_in_secs)))) {
1218 AWS_LOGF_WARN(
1219 AWS_LS_IO_SOCKET,
1220 "id=%p fd=%d: setsockopt() for enabling TCP_KEEPINTVL for TCP failed with errno %d.",
1221 (void *)socket,
1222 socket->io_handle.data.fd,
1223 errno);
1224 }
1225 }
1226
1227 if (socket->options.keep_alive_max_failed_probes) {
1228 int max_probes = socket->options.keep_alive_max_failed_probes;
1229 if (AWS_UNLIKELY(
1230 setsockopt(socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPCNT, &max_probes, sizeof(max_probes)))) {
1231 AWS_LOGF_WARN(
1232 AWS_LS_IO_SOCKET,
1233 "id=%p fd=%d: setsockopt() for enabling TCP_KEEPCNT for TCP failed with errno %d.",
1234 (void *)socket,
1235 socket->io_handle.data.fd,
1236 errno);
1237 }
1238 }
1239 }
1240
1241 return AWS_OP_SUCCESS;
1242 }
1243
1244 struct write_request {
1245 struct aws_byte_cursor cursor_cpy;
1246 aws_socket_on_write_completed_fn *written_fn;
1247 void *write_user_data;
1248 struct aws_linked_list_node node;
1249 size_t original_buffer_len;
1250 int error_code;
1251 };
1252
1253 struct posix_socket_close_args {
1254 struct aws_mutex mutex;
1255 struct aws_condition_variable condition_variable;
1256 struct aws_socket *socket;
1257 bool invoked;
1258 int ret_code;
1259 };
1260
s_close_predicate(void * arg)1261 static bool s_close_predicate(void *arg) {
1262 struct posix_socket_close_args *close_args = arg;
1263 return close_args->invoked;
1264 }
1265
s_close_task(struct aws_task * task,void * arg,enum aws_task_status status)1266 static void s_close_task(struct aws_task *task, void *arg, enum aws_task_status status) {
1267 (void)task;
1268 (void)status;
1269
1270 struct posix_socket_close_args *close_args = arg;
1271 aws_mutex_lock(&close_args->mutex);
1272 close_args->ret_code = AWS_OP_SUCCESS;
1273
1274 if (aws_socket_close(close_args->socket)) {
1275 close_args->ret_code = aws_last_error();
1276 }
1277
1278 close_args->invoked = true;
1279 aws_condition_variable_notify_one(&close_args->condition_variable);
1280 aws_mutex_unlock(&close_args->mutex);
1281 }
1282
aws_socket_close(struct aws_socket * socket)1283 int aws_socket_close(struct aws_socket *socket) {
1284 struct posix_socket *socket_impl = socket->impl;
1285 AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: closing", (void *)socket, socket->io_handle.data.fd);
1286 struct aws_event_loop *event_loop = socket->event_loop;
1287 if (socket->event_loop) {
1288 /* don't freak out on me, this almost never happens, and never occurs inside a channel
1289 * it only gets hit from a listening socket shutting down or from a unit test. */
1290 if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) {
1291 AWS_LOGF_INFO(
1292 AWS_LS_IO_SOCKET,
1293 "id=%p fd=%d: closing from a different thread than "
1294 "the socket is running from. Blocking until it closes down.",
1295 (void *)socket,
1296 socket->io_handle.data.fd);
1297 /* the only time we allow this kind of thing is when you're a listener.*/
1298 if (socket->state != LISTENING) {
1299 return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
1300 }
1301
1302 struct posix_socket_close_args args = {
1303 .mutex = AWS_MUTEX_INIT,
1304 .condition_variable = AWS_CONDITION_VARIABLE_INIT,
1305 .socket = socket,
1306 .ret_code = AWS_OP_SUCCESS,
1307 .invoked = false,
1308 };
1309
1310 struct aws_task close_task = {
1311 .fn = s_close_task,
1312 .arg = &args,
1313 };
1314
1315 int fd_for_logging = socket->io_handle.data.fd; /* socket's fd gets reset before final log */
1316 (void)fd_for_logging;
1317
1318 aws_mutex_lock(&args.mutex);
1319 aws_event_loop_schedule_task_now(socket->event_loop, &close_task);
1320 aws_condition_variable_wait_pred(&args.condition_variable, &args.mutex, s_close_predicate, &args);
1321 aws_mutex_unlock(&args.mutex);
1322 AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p fd=%d: close task completed.", (void *)socket, fd_for_logging);
1323 if (args.ret_code) {
1324 return aws_raise_error(args.ret_code);
1325 }
1326
1327 return AWS_OP_SUCCESS;
1328 }
1329
1330 if (socket_impl->currently_subscribed) {
1331 if (socket->state & LISTENING) {
1332 aws_socket_stop_accept(socket);
1333 } else {
1334 int err_code = aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle);
1335
1336 if (err_code) {
1337 return AWS_OP_ERR;
1338 }
1339 }
1340 socket_impl->currently_subscribed = false;
1341 socket->event_loop = NULL;
1342 }
1343 }
1344
1345 if (socket_impl->close_happened) {
1346 *socket_impl->close_happened = true;
1347 }
1348
1349 if (socket_impl->connect_args) {
1350 socket_impl->connect_args->socket = NULL;
1351 socket_impl->connect_args = NULL;
1352 }
1353
1354 if (aws_socket_is_open(socket)) {
1355 close(socket->io_handle.data.fd);
1356 socket->io_handle.data.fd = -1;
1357 socket->state = CLOSED;
1358
1359 /* ensure callbacks for pending writes fire (in order) before this close function returns */
1360
1361 if (socket_impl->written_task_scheduled) {
1362 aws_event_loop_cancel_task(event_loop, &socket_impl->written_task);
1363 }
1364
1365 while (!aws_linked_list_empty(&socket_impl->written_queue)) {
1366 struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->written_queue);
1367 struct write_request *write_request = AWS_CONTAINER_OF(node, struct write_request, node);
1368 size_t bytes_written = write_request->original_buffer_len - write_request->cursor_cpy.len;
1369 write_request->written_fn(socket, write_request->error_code, bytes_written, write_request->write_user_data);
1370 aws_mem_release(socket->allocator, write_request);
1371 }
1372
1373 while (!aws_linked_list_empty(&socket_impl->write_queue)) {
1374 struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->write_queue);
1375 struct write_request *write_request = AWS_CONTAINER_OF(node, struct write_request, node);
1376 size_t bytes_written = write_request->original_buffer_len - write_request->cursor_cpy.len;
1377 write_request->written_fn(socket, AWS_IO_SOCKET_CLOSED, bytes_written, write_request->write_user_data);
1378 aws_mem_release(socket->allocator, write_request);
1379 }
1380 }
1381
1382 return AWS_OP_SUCCESS;
1383 }
1384
aws_socket_shutdown_dir(struct aws_socket * socket,enum aws_channel_direction dir)1385 int aws_socket_shutdown_dir(struct aws_socket *socket, enum aws_channel_direction dir) {
1386 int how = dir == AWS_CHANNEL_DIR_READ ? 0 : 1;
1387 AWS_LOGF_DEBUG(
1388 AWS_LS_IO_SOCKET, "id=%p fd=%d: shutting down in direction %d", (void *)socket, socket->io_handle.data.fd, dir);
1389 if (shutdown(socket->io_handle.data.fd, how)) {
1390 int aws_error = s_determine_socket_error(errno);
1391 return aws_raise_error(aws_error);
1392 }
1393
1394 if (dir == AWS_CHANNEL_DIR_READ) {
1395 socket->state &= ~CONNECTED_READ;
1396 } else {
1397 socket->state &= ~CONNECTED_WRITE;
1398 }
1399
1400 return AWS_OP_SUCCESS;
1401 }
1402
s_written_task(struct aws_task * task,void * arg,enum aws_task_status status)1403 static void s_written_task(struct aws_task *task, void *arg, enum aws_task_status status) {
1404 (void)task;
1405 (void)status;
1406
1407 struct aws_socket *socket = arg;
1408 struct posix_socket *socket_impl = socket->impl;
1409
1410 socket_impl->written_task_scheduled = false;
1411
1412 /* this is to handle a race condition when a callback kicks off a cleanup, or the user decides
1413 * to close the socket based on something they read (SSL validation failed for example).
1414 * if clean_up happens when internal_refcount > 0, socket_impl is kept dangling */
1415 aws_ref_count_acquire(&socket_impl->internal_refcount);
1416
1417 /* Notes about weird loop:
1418 * 1) Only process the initial contents of queue when this task is run,
1419 * ignoring any writes queued during delivery.
1420 * If we simply looped until the queue was empty, we could get into a
1421 * synchronous loop of completing and writing and completing and writing...
1422 * and it would be tough for multiple sockets to share an event-loop fairly.
1423 * 2) Check if queue is empty with each iteration.
1424 * If user calls close() from the callback, close() will process all
1425 * nodes in the written_queue, and the queue will be empty when the
1426 * callstack gets back to here. */
1427 if (!aws_linked_list_empty(&socket_impl->written_queue)) {
1428 struct aws_linked_list_node *stop_after = aws_linked_list_back(&socket_impl->written_queue);
1429 do {
1430 struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->written_queue);
1431 struct write_request *write_request = AWS_CONTAINER_OF(node, struct write_request, node);
1432 size_t bytes_written = write_request->original_buffer_len - write_request->cursor_cpy.len;
1433 write_request->written_fn(socket, write_request->error_code, bytes_written, write_request->write_user_data);
1434 aws_mem_release(socket_impl->allocator, write_request);
1435 if (node == stop_after) {
1436 break;
1437 }
1438 } while (!aws_linked_list_empty(&socket_impl->written_queue));
1439 }
1440
1441 aws_ref_count_release(&socket_impl->internal_refcount);
1442 }
1443
1444 /* this gets called in two scenarios.
1445 * 1st scenario, someone called aws_socket_write() and we want to try writing now, so an error can be returned
1446 * immediately if something bad has happened to the socket. In this case, `parent_request` is set.
1447 * 2nd scenario, the event loop notified us that the socket went writable. In this case `parent_request` is NULL */
s_process_write_requests(struct aws_socket * socket,struct write_request * parent_request)1448 static int s_process_write_requests(struct aws_socket *socket, struct write_request *parent_request) {
1449 struct posix_socket *socket_impl = socket->impl;
1450
1451 if (parent_request) {
1452 AWS_LOGF_TRACE(
1453 AWS_LS_IO_SOCKET,
1454 "id=%p fd=%d: processing write requests, called from aws_socket_write",
1455 (void *)socket,
1456 socket->io_handle.data.fd);
1457 } else {
1458 AWS_LOGF_TRACE(
1459 AWS_LS_IO_SOCKET,
1460 "id=%p fd=%d: processing write requests, invoked by the event-loop",
1461 (void *)socket,
1462 socket->io_handle.data.fd);
1463 }
1464
1465 bool purge = false;
1466 int aws_error = AWS_OP_SUCCESS;
1467 bool parent_request_failed = false;
1468 bool pushed_to_written_queue = false;
1469
1470 /* if a close call happens in the middle, this queue will have been cleaned out from under us. */
1471 while (!aws_linked_list_empty(&socket_impl->write_queue)) {
1472 struct aws_linked_list_node *node = aws_linked_list_front(&socket_impl->write_queue);
1473 struct write_request *write_request = AWS_CONTAINER_OF(node, struct write_request, node);
1474
1475 AWS_LOGF_TRACE(
1476 AWS_LS_IO_SOCKET,
1477 "id=%p fd=%d: dequeued write request of size %llu, remaining to write %llu",
1478 (void *)socket,
1479 socket->io_handle.data.fd,
1480 (unsigned long long)write_request->original_buffer_len,
1481 (unsigned long long)write_request->cursor_cpy.len);
1482
1483 ssize_t written = send(
1484 socket->io_handle.data.fd, write_request->cursor_cpy.ptr, write_request->cursor_cpy.len, NO_SIGNAL_SEND);
1485
1486 AWS_LOGF_TRACE(
1487 AWS_LS_IO_SOCKET,
1488 "id=%p fd=%d: send written size %d",
1489 (void *)socket,
1490 socket->io_handle.data.fd,
1491 (int)written);
1492
1493 if (written < 0) {
1494 int error = errno;
1495 if (error == EAGAIN) {
1496 AWS_LOGF_TRACE(
1497 AWS_LS_IO_SOCKET, "id=%p fd=%d: returned would block", (void *)socket, socket->io_handle.data.fd);
1498 break;
1499 }
1500
1501 if (error == EPIPE) {
1502 AWS_LOGF_DEBUG(
1503 AWS_LS_IO_SOCKET,
1504 "id=%p fd=%d: already closed before write",
1505 (void *)socket,
1506 socket->io_handle.data.fd);
1507 aws_error = AWS_IO_SOCKET_CLOSED;
1508 aws_raise_error(aws_error);
1509 purge = true;
1510 break;
1511 }
1512
1513 purge = true;
1514 AWS_LOGF_DEBUG(
1515 AWS_LS_IO_SOCKET,
1516 "id=%p fd=%d: write error with error code %d",
1517 (void *)socket,
1518 socket->io_handle.data.fd,
1519 error);
1520 aws_error = s_determine_socket_error(error);
1521 aws_raise_error(aws_error);
1522 break;
1523 }
1524
1525 size_t remaining_to_write = write_request->cursor_cpy.len;
1526
1527 aws_byte_cursor_advance(&write_request->cursor_cpy, (size_t)written);
1528 AWS_LOGF_TRACE(
1529 AWS_LS_IO_SOCKET,
1530 "id=%p fd=%d: remaining write request to write %llu",
1531 (void *)socket,
1532 socket->io_handle.data.fd,
1533 (unsigned long long)write_request->cursor_cpy.len);
1534
1535 if ((size_t)written == remaining_to_write) {
1536 AWS_LOGF_TRACE(
1537 AWS_LS_IO_SOCKET, "id=%p fd=%d: write request completed", (void *)socket, socket->io_handle.data.fd);
1538
1539 aws_linked_list_remove(node);
1540 write_request->error_code = AWS_ERROR_SUCCESS;
1541 aws_linked_list_push_back(&socket_impl->written_queue, node);
1542 pushed_to_written_queue = true;
1543 }
1544 }
1545
1546 if (purge) {
1547 while (!aws_linked_list_empty(&socket_impl->write_queue)) {
1548 struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->write_queue);
1549 struct write_request *write_request = AWS_CONTAINER_OF(node, struct write_request, node);
1550
1551 /* If this fn was invoked directly from aws_socket_write(), don't invoke the error callback
1552 * as the user will be able to rely on the return value from aws_socket_write() */
1553 if (write_request == parent_request) {
1554 parent_request_failed = true;
1555 aws_mem_release(socket->allocator, write_request);
1556 } else {
1557 write_request->error_code = aws_error;
1558 aws_linked_list_push_back(&socket_impl->written_queue, node);
1559 pushed_to_written_queue = true;
1560 }
1561 }
1562 }
1563
1564 if (pushed_to_written_queue && !socket_impl->written_task_scheduled) {
1565 socket_impl->written_task_scheduled = true;
1566 aws_task_init(&socket_impl->written_task, s_written_task, socket, "socket_written_task");
1567 aws_event_loop_schedule_task_now(socket->event_loop, &socket_impl->written_task);
1568 }
1569
1570 /* Only report error if aws_socket_write() invoked this function and its write_request failed */
1571 if (!parent_request_failed) {
1572 return AWS_OP_SUCCESS;
1573 }
1574
1575 aws_raise_error(aws_error);
1576 return AWS_OP_ERR;
1577 }
1578
s_on_socket_io_event(struct aws_event_loop * event_loop,struct aws_io_handle * handle,int events,void * user_data)1579 static void s_on_socket_io_event(
1580 struct aws_event_loop *event_loop,
1581 struct aws_io_handle *handle,
1582 int events,
1583 void *user_data) {
1584 (void)event_loop;
1585 (void)handle;
1586 struct aws_socket *socket = user_data;
1587 struct posix_socket *socket_impl = socket->impl;
1588
1589 /* this is to handle a race condition when an error kicks off a cleanup, or the user decides
1590 * to close the socket based on something they read (SSL validation failed for example).
1591 * if clean_up happens when internal_refcount > 0, socket_impl is kept dangling but currently
1592 * subscribed is set to false. */
1593 aws_ref_count_acquire(&socket_impl->internal_refcount);
1594
1595 if (events & AWS_IO_EVENT_TYPE_REMOTE_HANG_UP || events & AWS_IO_EVENT_TYPE_CLOSED) {
1596 aws_raise_error(AWS_IO_SOCKET_CLOSED);
1597 AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: closed remotely", (void *)socket, socket->io_handle.data.fd);
1598 if (socket->readable_fn) {
1599 socket->readable_fn(socket, AWS_IO_SOCKET_CLOSED, socket->readable_user_data);
1600 }
1601 goto end_check;
1602 }
1603
1604 if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_ERROR) {
1605 int aws_error = aws_socket_get_error(socket);
1606 aws_raise_error(aws_error);
1607 AWS_LOGF_TRACE(
1608 AWS_LS_IO_SOCKET, "id=%p fd=%d: error event occurred", (void *)socket, socket->io_handle.data.fd);
1609 if (socket->readable_fn) {
1610 socket->readable_fn(socket, aws_error, socket->readable_user_data);
1611 }
1612 goto end_check;
1613 }
1614
1615 if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_READABLE) {
1616 AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is readable", (void *)socket, socket->io_handle.data.fd);
1617 if (socket->readable_fn) {
1618 socket->readable_fn(socket, AWS_OP_SUCCESS, socket->readable_user_data);
1619 }
1620 }
1621 /* if socket closed in between these branches, the currently_subscribed will be false and socket_impl will not
1622 * have been cleaned up, so this next branch is safe. */
1623 if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_WRITABLE) {
1624 AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is writable", (void *)socket, socket->io_handle.data.fd);
1625 s_process_write_requests(socket, NULL);
1626 }
1627
1628 end_check:
1629 aws_ref_count_release(&socket_impl->internal_refcount);
1630 }
1631
aws_socket_assign_to_event_loop(struct aws_socket * socket,struct aws_event_loop * event_loop)1632 int aws_socket_assign_to_event_loop(struct aws_socket *socket, struct aws_event_loop *event_loop) {
1633 if (!socket->event_loop) {
1634 AWS_LOGF_DEBUG(
1635 AWS_LS_IO_SOCKET,
1636 "id=%p fd=%d: assigning to event loop %p",
1637 (void *)socket,
1638 socket->io_handle.data.fd,
1639 (void *)event_loop);
1640 socket->event_loop = event_loop;
1641 struct posix_socket *socket_impl = socket->impl;
1642 socket_impl->currently_subscribed = true;
1643 if (aws_event_loop_subscribe_to_io_events(
1644 event_loop,
1645 &socket->io_handle,
1646 AWS_IO_EVENT_TYPE_WRITABLE | AWS_IO_EVENT_TYPE_READABLE,
1647 s_on_socket_io_event,
1648 socket)) {
1649 AWS_LOGF_ERROR(
1650 AWS_LS_IO_SOCKET,
1651 "id=%p fd=%d: assigning to event loop %p failed with error %d",
1652 (void *)socket,
1653 socket->io_handle.data.fd,
1654 (void *)event_loop,
1655 aws_last_error());
1656 socket_impl->currently_subscribed = false;
1657 socket->event_loop = NULL;
1658 return AWS_OP_ERR;
1659 }
1660
1661 return AWS_OP_SUCCESS;
1662 }
1663
1664 return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED);
1665 }
1666
aws_socket_get_event_loop(struct aws_socket * socket)1667 struct aws_event_loop *aws_socket_get_event_loop(struct aws_socket *socket) {
1668 return socket->event_loop;
1669 }
1670
aws_socket_subscribe_to_readable_events(struct aws_socket * socket,aws_socket_on_readable_fn * on_readable,void * user_data)1671 int aws_socket_subscribe_to_readable_events(
1672 struct aws_socket *socket,
1673 aws_socket_on_readable_fn *on_readable,
1674 void *user_data) {
1675
1676 AWS_LOGF_TRACE(
1677 AWS_LS_IO_SOCKET, " id=%p fd=%d: subscribing to readable events", (void *)socket, socket->io_handle.data.fd);
1678 if (!(socket->state & CONNECTED_READ)) {
1679 AWS_LOGF_ERROR(
1680 AWS_LS_IO_SOCKET,
1681 "id=%p fd=%d: can't subscribe to readable events since the socket is not connected",
1682 (void *)socket,
1683 socket->io_handle.data.fd);
1684 return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED);
1685 }
1686
1687 if (socket->readable_fn) {
1688 AWS_LOGF_ERROR(
1689 AWS_LS_IO_SOCKET,
1690 "id=%p fd=%d: can't subscribe to readable events since it is already subscribed",
1691 (void *)socket,
1692 socket->io_handle.data.fd);
1693 return aws_raise_error(AWS_ERROR_IO_ALREADY_SUBSCRIBED);
1694 }
1695
1696 AWS_ASSERT(on_readable);
1697 socket->readable_user_data = user_data;
1698 socket->readable_fn = on_readable;
1699
1700 return AWS_OP_SUCCESS;
1701 }
1702
aws_socket_read(struct aws_socket * socket,struct aws_byte_buf * buffer,size_t * amount_read)1703 int aws_socket_read(struct aws_socket *socket, struct aws_byte_buf *buffer, size_t *amount_read) {
1704 AWS_ASSERT(amount_read);
1705
1706 if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) {
1707 AWS_LOGF_ERROR(
1708 AWS_LS_IO_SOCKET,
1709 "id=%p fd=%d: cannot read from a different thread than event loop %p",
1710 (void *)socket,
1711 socket->io_handle.data.fd,
1712 (void *)socket->event_loop);
1713 return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
1714 }
1715
1716 if (!(socket->state & CONNECTED_READ)) {
1717 AWS_LOGF_ERROR(
1718 AWS_LS_IO_SOCKET,
1719 "id=%p fd=%d: cannot read because it is not connected",
1720 (void *)socket,
1721 socket->io_handle.data.fd);
1722 return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED);
1723 }
1724
1725 ssize_t read_val = read(socket->io_handle.data.fd, buffer->buffer + buffer->len, buffer->capacity - buffer->len);
1726 int error = errno;
1727
1728 AWS_LOGF_TRACE(
1729 AWS_LS_IO_SOCKET, "id=%p fd=%d: read of %d", (void *)socket, socket->io_handle.data.fd, (int)read_val);
1730
1731 if (read_val > 0) {
1732 *amount_read = (size_t)read_val;
1733 buffer->len += *amount_read;
1734 return AWS_OP_SUCCESS;
1735 }
1736
1737 /* read_val of 0 means EOF which we'll treat as AWS_IO_SOCKET_CLOSED */
1738 if (read_val == 0) {
1739 AWS_LOGF_INFO(
1740 AWS_LS_IO_SOCKET, "id=%p fd=%d: zero read, socket is closed", (void *)socket, socket->io_handle.data.fd);
1741 *amount_read = 0;
1742
1743 if (buffer->capacity - buffer->len > 0) {
1744 return aws_raise_error(AWS_IO_SOCKET_CLOSED);
1745 }
1746
1747 return AWS_OP_SUCCESS;
1748 }
1749
1750 #if defined(EWOULDBLOCK)
1751 if (error == EAGAIN || error == EWOULDBLOCK) {
1752 #else
1753 if (error == EAGAIN) {
1754 #endif
1755 AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: read would block", (void *)socket, socket->io_handle.data.fd);
1756 return aws_raise_error(AWS_IO_READ_WOULD_BLOCK);
1757 }
1758
1759 if (error == EPIPE) {
1760 AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p fd=%d: socket is closed.", (void *)socket, socket->io_handle.data.fd);
1761 return aws_raise_error(AWS_IO_SOCKET_CLOSED);
1762 }
1763
1764 if (error == ETIMEDOUT) {
1765 AWS_LOGF_ERROR(AWS_LS_IO_SOCKET, "id=%p fd=%d: socket timed out.", (void *)socket, socket->io_handle.data.fd);
1766 return aws_raise_error(AWS_IO_SOCKET_TIMEOUT);
1767 }
1768
1769 AWS_LOGF_ERROR(
1770 AWS_LS_IO_SOCKET,
1771 "id=%p fd=%d: read failed with error: %s",
1772 (void *)socket,
1773 socket->io_handle.data.fd,
1774 strerror(error));
1775 return aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
1776 }
1777
1778 int aws_socket_write(
1779 struct aws_socket *socket,
1780 const struct aws_byte_cursor *cursor,
1781 aws_socket_on_write_completed_fn *written_fn,
1782 void *user_data) {
1783 if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) {
1784 return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
1785 }
1786
1787 if (!(socket->state & CONNECTED_WRITE)) {
1788 AWS_LOGF_ERROR(
1789 AWS_LS_IO_SOCKET,
1790 "id=%p fd=%d: cannot write to because it is not connected",
1791 (void *)socket,
1792 socket->io_handle.data.fd);
1793 return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED);
1794 }
1795
1796 AWS_ASSERT(written_fn);
1797 struct posix_socket *socket_impl = socket->impl;
1798 struct write_request *write_request = aws_mem_calloc(socket->allocator, 1, sizeof(struct write_request));
1799
1800 if (!write_request) {
1801 return AWS_OP_ERR;
1802 }
1803
1804 write_request->original_buffer_len = cursor->len;
1805 write_request->written_fn = written_fn;
1806 write_request->write_user_data = user_data;
1807 write_request->cursor_cpy = *cursor;
1808 aws_linked_list_push_back(&socket_impl->write_queue, &write_request->node);
1809
1810 return s_process_write_requests(socket, write_request);
1811 }
1812
1813 int aws_socket_get_error(struct aws_socket *socket) {
1814 int connect_result;
1815 socklen_t result_length = sizeof(connect_result);
1816
1817 if (getsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_ERROR, &connect_result, &result_length) < 0) {
1818 return AWS_OP_ERR;
1819 }
1820
1821 if (connect_result) {
1822 return s_determine_socket_error(connect_result);
1823 }
1824
1825 return AWS_OP_SUCCESS;
1826 }
1827
1828 bool aws_socket_is_open(struct aws_socket *socket) {
1829 return socket->io_handle.data.fd >= 0;
1830 }
1831