1 /**
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  * SPDX-License-Identifier: Apache-2.0.
4  */
5 
6 #include "aws/s3/private/s3_auto_ranged_get.h"
7 #include "aws/s3/private/s3_auto_ranged_put.h"
8 #include "aws/s3/private/s3_client_impl.h"
9 #include "aws/s3/private/s3_default_meta_request.h"
10 #include "aws/s3/private/s3_meta_request_impl.h"
11 #include "aws/s3/private/s3_util.h"
12 
13 #include <aws/auth/credentials.h>
14 #include <aws/common/assert.h>
15 #include <aws/common/atomics.h>
16 #include <aws/common/clock.h>
17 #include <aws/common/device_random.h>
18 #include <aws/common/environment.h>
19 #include <aws/common/string.h>
20 #include <aws/common/system_info.h>
21 #include <aws/http/connection.h>
22 #include <aws/http/connection_manager.h>
23 #include <aws/http/proxy.h>
24 #include <aws/http/request_response.h>
25 #include <aws/io/channel_bootstrap.h>
26 #include <aws/io/event_loop.h>
27 #include <aws/io/host_resolver.h>
28 #include <aws/io/retry_strategy.h>
29 #include <aws/io/socket.h>
30 #include <aws/io/stream.h>
31 #include <aws/io/tls_channel_handler.h>
32 #include <aws/io/uri.h>
33 
34 #include <inttypes.h>
35 #include <math.h>
36 
37 #if _MSC_VER
38 #    pragma warning(disable : 4232) /* function pointer to dll symbol */
39 #endif                              /* _MSC_VER */
40 
41 struct aws_s3_meta_request_work {
42     struct aws_linked_list_node node;
43     struct aws_s3_meta_request *meta_request;
44 };
45 
46 static const enum aws_log_level s_log_level_client_stats = AWS_LL_INFO;
47 
48 static const uint32_t s_max_requests_multiplier = 4;
49 
50 /* TODO Provide analysis on origins of this value. */
51 static const double s_throughput_per_vip_gbps = 4.0;
52 
53 /* Preferred amount of active connections per meta request type. */
54 const uint32_t g_num_conns_per_vip_meta_request_look_up[AWS_S3_META_REQUEST_TYPE_MAX] = {
55     10, /* AWS_S3_META_REQUEST_TYPE_DEFAULT */
56     10, /* AWS_S3_META_REQUEST_TYPE_GET_OBJECT */
57     10, /* AWS_S3_META_REQUEST_TYPE_PUT_OBJECT */
58 };
59 
60 /* Should be max of s_num_conns_per_vip_meta_request_look_up */
61 const uint32_t g_max_num_connections_per_vip = 10;
62 
63 /**
64  * Default part size is 8 MB to reach the best performance from the experiments we had.
65  * Default max part size is SIZE_MAX at 32bit build, which is around 4GB, which is 5GB at 64bit build.
66  *      The server limit is 5GB, but object size limit is 5TB for now. We should be good enough for all the case.
67  *      For upload, the max number of parts is 10000, which will limits the object size to 40TB for 32bit and 50TB for
68  *      64bit.
69  * TODO Provide more information on other values.
70  */
71 static const size_t s_default_part_size = 8 * 1024 * 1024;
72 static const size_t s_default_max_part_size = SIZE_MAX < 5000000000000ULL ? SIZE_MAX : 5000000000000ULL;
73 static const double s_default_throughput_target_gbps = 10.0;
74 static const uint32_t s_default_max_retries = 5;
75 static size_t s_dns_host_address_ttl_seconds = 5 * 60;
76 
77 /* Called when ref count is 0. */
78 static void s_s3_client_start_destroy(void *user_data);
79 
80 /* Called by s_s3_client_process_work_default when all shutdown criteria has been met. */
81 static void s_s3_client_finish_destroy_default(struct aws_s3_client *client);
82 
83 /* Called when the body streaming elg shutdown has completed. */
84 static void s_s3_client_body_streaming_elg_shutdown(void *user_data);
85 
86 static void s_s3_client_create_connection_for_request(struct aws_s3_client *client, struct aws_s3_request *request);
87 
88 /* Callback which handles the HTTP connection retrieved by acquire_http_connection. */
89 static void s_s3_client_on_acquire_http_connection(
90     struct aws_http_connection *http_connection,
91     int error_code,
92     void *user_data);
93 
94 static void s_s3_client_push_meta_request_synced(
95     struct aws_s3_client *client,
96     struct aws_s3_meta_request *meta_request);
97 
98 /* Schedule task for processing work. (Calls the corresponding vtable function.) */
99 static void s_s3_client_schedule_process_work_synced(struct aws_s3_client *client);
100 
101 /* Default implementation for scheduling processing of work. */
102 static void s_s3_client_schedule_process_work_synced_default(struct aws_s3_client *client);
103 
104 /* Actual task function that processes work. */
105 static void s_s3_client_process_work_task(struct aws_task *task, void *arg, enum aws_task_status task_status);
106 
107 static void s_s3_client_process_work_default(struct aws_s3_client *client);
108 
109 static bool s_s3_client_endpoint_ref_count_zero(struct aws_s3_endpoint *endpoint);
110 
111 static void s_s3_client_endpoint_shutdown_callback(void *user_data);
112 
113 /* Default factory function for creating a meta request. */
114 static struct aws_s3_meta_request *s_s3_client_meta_request_factory_default(
115     struct aws_s3_client *client,
116     const struct aws_s3_meta_request_options *options);
117 
118 static struct aws_s3_client_vtable s_s3_client_default_vtable = {
119     .meta_request_factory = s_s3_client_meta_request_factory_default,
120     .acquire_http_connection = aws_http_connection_manager_acquire_connection,
121     .get_host_address_count = aws_host_resolver_get_host_address_count,
122     .schedule_process_work_synced = s_s3_client_schedule_process_work_synced_default,
123     .process_work = s_s3_client_process_work_default,
124     .endpoint_ref_count_zero = s_s3_client_endpoint_ref_count_zero,
125     .endpoint_shutdown_callback = s_s3_client_endpoint_shutdown_callback,
126     .finish_destroy = s_s3_client_finish_destroy_default,
127 };
128 
aws_s3_set_dns_ttl(size_t ttl)129 void aws_s3_set_dns_ttl(size_t ttl) {
130     s_dns_host_address_ttl_seconds = ttl;
131 }
132 
133 /* Returns the max number of connections allowed.
134  *
135  * When meta request is NULL, this will return the overall allowed number of connections.
136  *
137  * If meta_request is not NULL, this will give the max number of connections allowed for that meta request type on
138  * thatendpoint.
139  */
aws_s3_client_get_max_active_connections(struct aws_s3_client * client,struct aws_s3_meta_request * meta_request)140 uint32_t aws_s3_client_get_max_active_connections(
141     struct aws_s3_client *client,
142     struct aws_s3_meta_request *meta_request) {
143     AWS_PRECONDITION(client);
144 
145     uint32_t num_connections_per_vip = g_max_num_connections_per_vip;
146     uint32_t num_vips = client->ideal_vip_count;
147 
148     if (meta_request != NULL) {
149         num_connections_per_vip = g_num_conns_per_vip_meta_request_look_up[meta_request->type];
150 
151         struct aws_s3_endpoint *endpoint = meta_request->endpoint;
152         AWS_ASSERT(endpoint != NULL);
153 
154         AWS_ASSERT(client->vtable->get_host_address_count);
155         size_t num_known_vips = client->vtable->get_host_address_count(
156             client->client_bootstrap->host_resolver, endpoint->host_name, AWS_GET_HOST_ADDRESS_COUNT_RECORD_TYPE_A);
157 
158         /* If the number of known vips is less than our ideal VIP count, clamp it. */
159         if (num_known_vips < (size_t)num_vips) {
160             num_vips = (uint32_t)num_known_vips;
161         }
162     }
163 
164     /* We always want to allow for at least one VIP worth of connections. */
165     if (num_vips == 0) {
166         num_vips = 1;
167     }
168 
169     uint32_t max_active_connections = num_vips * num_connections_per_vip;
170 
171     if (client->max_active_connections_override > 0 &&
172         client->max_active_connections_override < max_active_connections) {
173         max_active_connections = client->max_active_connections_override;
174     }
175 
176     return max_active_connections;
177 }
178 
179 /* Returns the max number of requests allowed to be in memory */
aws_s3_client_get_max_requests_in_flight(struct aws_s3_client * client)180 uint32_t aws_s3_client_get_max_requests_in_flight(struct aws_s3_client *client) {
181     AWS_PRECONDITION(client);
182     return aws_s3_client_get_max_active_connections(client, NULL) * s_max_requests_multiplier;
183 }
184 
185 /* Returns the max number of requests that should be in preparation stage (ie: reading from a stream, being signed,
186  * etc.) */
aws_s3_client_get_max_requests_prepare(struct aws_s3_client * client)187 uint32_t aws_s3_client_get_max_requests_prepare(struct aws_s3_client *client) {
188     return aws_s3_client_get_max_active_connections(client, NULL);
189 }
190 
s_s3_client_get_num_requests_network_io(struct aws_s3_client * client,enum aws_s3_meta_request_type meta_request_type)191 static uint32_t s_s3_client_get_num_requests_network_io(
192     struct aws_s3_client *client,
193     enum aws_s3_meta_request_type meta_request_type) {
194     AWS_PRECONDITION(client);
195 
196     uint32_t num_requests_network_io = 0;
197 
198     if (meta_request_type == AWS_S3_META_REQUEST_TYPE_MAX) {
199         for (uint32_t i = 0; i < AWS_S3_META_REQUEST_TYPE_MAX; ++i) {
200             num_requests_network_io += (uint32_t)aws_atomic_load_int(&client->stats.num_requests_network_io[i]);
201         }
202     } else {
203         num_requests_network_io =
204             (uint32_t)aws_atomic_load_int(&client->stats.num_requests_network_io[meta_request_type]);
205     }
206 
207     return num_requests_network_io;
208 }
209 
aws_s3_client_lock_synced_data(struct aws_s3_client * client)210 void aws_s3_client_lock_synced_data(struct aws_s3_client *client) {
211     aws_mutex_lock(&client->synced_data.lock);
212 }
213 
aws_s3_client_unlock_synced_data(struct aws_s3_client * client)214 void aws_s3_client_unlock_synced_data(struct aws_s3_client *client) {
215     aws_mutex_unlock(&client->synced_data.lock);
216 }
217 
aws_s3_client_new(struct aws_allocator * allocator,const struct aws_s3_client_config * client_config)218 struct aws_s3_client *aws_s3_client_new(
219     struct aws_allocator *allocator,
220     const struct aws_s3_client_config *client_config) {
221 
222     AWS_PRECONDITION(allocator);
223     AWS_PRECONDITION(client_config);
224 
225     if (client_config->client_bootstrap == NULL) {
226         AWS_LOGF_ERROR(
227             AWS_LS_S3_CLIENT,
228             "Cannot create client from client_config; client_bootstrap provided in options is invalid.");
229         aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
230         return NULL;
231     }
232 
233     /* Cannot be less than zero.  If zero, use default. */
234     if (client_config->throughput_target_gbps < 0.0) {
235         AWS_LOGF_ERROR(
236             AWS_LS_S3_CLIENT,
237             "Cannot create client from client_config; throughput_target_gbps cannot less than or equal to 0.");
238         aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
239         return NULL;
240     }
241 
242 #ifdef BYO_CRYPTO
243     if (client_config->tls_mode == AWS_MR_TLS_ENABLED && client_config->tls_connection_options == NULL) {
244         AWS_LOGF_ERROR(
245             AWS_LS_S3_CLIENT,
246             "Cannot create client from client_config; when using BYO_CRYPTO, tls_connection_options can not be "
247             "NULL when TLS is enabled.");
248         aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
249         return NULL;
250     }
251 #endif
252 
253     struct aws_s3_client *client = aws_mem_calloc(allocator, 1, sizeof(struct aws_s3_client));
254 
255     client->allocator = allocator;
256     client->vtable = &s_s3_client_default_vtable;
257 
258     aws_ref_count_init(&client->ref_count, client, (aws_simple_completion_callback *)s_s3_client_start_destroy);
259 
260     if (aws_mutex_init(&client->synced_data.lock) != AWS_OP_SUCCESS) {
261         goto lock_init_fail;
262     }
263 
264     aws_linked_list_init(&client->synced_data.pending_meta_request_work);
265     aws_linked_list_init(&client->synced_data.prepared_requests);
266 
267     aws_linked_list_init(&client->threaded_data.meta_requests);
268     aws_linked_list_init(&client->threaded_data.request_queue);
269 
270     aws_atomic_init_int(&client->stats.num_requests_in_flight, 0);
271 
272     for (uint32_t i = 0; i < (uint32_t)AWS_S3_META_REQUEST_TYPE_MAX; ++i) {
273         aws_atomic_init_int(&client->stats.num_requests_network_io[i], 0);
274     }
275 
276     aws_atomic_init_int(&client->stats.num_requests_stream_queued_waiting, 0);
277     aws_atomic_init_int(&client->stats.num_requests_streaming, 0);
278 
279     *((uint32_t *)&client->max_active_connections_override) = client_config->max_active_connections_override;
280 
281     /* Store our client bootstrap. */
282     client->client_bootstrap = aws_client_bootstrap_acquire(client_config->client_bootstrap);
283 
284     struct aws_event_loop_group *event_loop_group = client_config->client_bootstrap->event_loop_group;
285     aws_event_loop_group_acquire(event_loop_group);
286 
287     client->process_work_event_loop = aws_event_loop_group_get_next_loop(event_loop_group);
288 
289     /* Set up body streaming ELG */
290     {
291         uint16_t num_event_loops =
292             (uint16_t)aws_array_list_length(&client->client_bootstrap->event_loop_group->event_loops);
293         uint16_t num_streaming_threads = num_event_loops;
294 
295         if (num_streaming_threads < 1) {
296             num_streaming_threads = 1;
297         }
298 
299         struct aws_shutdown_callback_options body_streaming_elg_shutdown_options = {
300             .shutdown_callback_fn = s_s3_client_body_streaming_elg_shutdown,
301             .shutdown_callback_user_data = client,
302         };
303 
304         if (aws_get_cpu_group_count() > 1) {
305             client->body_streaming_elg = aws_event_loop_group_new_default_pinned_to_cpu_group(
306                 client->allocator, num_streaming_threads, 1, &body_streaming_elg_shutdown_options);
307         } else {
308             client->body_streaming_elg = aws_event_loop_group_new_default(
309                 client->allocator, num_streaming_threads, &body_streaming_elg_shutdown_options);
310         }
311         if (!client->body_streaming_elg) {
312             /* Fail to create elg, we should fail the call */
313             goto elg_create_fail;
314         }
315         client->synced_data.body_streaming_elg_allocated = true;
316     }
317 
318     /* Make a copy of the region string. */
319     client->region = aws_string_new_from_array(allocator, client_config->region.ptr, client_config->region.len);
320 
321     if (client_config->part_size != 0) {
322         *((size_t *)&client->part_size) = client_config->part_size;
323     } else {
324         *((size_t *)&client->part_size) = s_default_part_size;
325     }
326 
327     if (client_config->max_part_size != 0) {
328         *((size_t *)&client->max_part_size) = client_config->max_part_size;
329     } else {
330         *((size_t *)&client->max_part_size) = s_default_max_part_size;
331     }
332 
333     if (client_config->max_part_size < client_config->part_size) {
334         *((size_t *)&client_config->max_part_size) = client_config->part_size;
335     }
336 
337     if (client_config->tls_mode == AWS_MR_TLS_ENABLED) {
338         client->tls_connection_options =
339             aws_mem_calloc(client->allocator, 1, sizeof(struct aws_tls_connection_options));
340 
341         if (client_config->tls_connection_options != NULL) {
342             aws_tls_connection_options_copy(client->tls_connection_options, client_config->tls_connection_options);
343         } else {
344 #ifdef BYO_CRYPTO
345             AWS_FATAL_ASSERT(false);
346             goto on_error;
347 #else
348             struct aws_tls_ctx_options default_tls_ctx_options;
349             AWS_ZERO_STRUCT(default_tls_ctx_options);
350 
351             aws_tls_ctx_options_init_default_client(&default_tls_ctx_options, allocator);
352 
353             struct aws_tls_ctx *default_tls_ctx = aws_tls_client_ctx_new(allocator, &default_tls_ctx_options);
354             if (default_tls_ctx == NULL) {
355                 goto on_error;
356             }
357 
358             aws_tls_connection_options_init_from_ctx(client->tls_connection_options, default_tls_ctx);
359 
360             aws_tls_ctx_release(default_tls_ctx);
361             aws_tls_ctx_options_clean_up(&default_tls_ctx_options);
362 #endif
363         }
364     }
365 
366     if (client_config->throughput_target_gbps != 0.0) {
367         *((double *)&client->throughput_target_gbps) = client_config->throughput_target_gbps;
368     } else {
369         *((double *)&client->throughput_target_gbps) = s_default_throughput_target_gbps;
370     }
371 
372     *((enum aws_s3_meta_request_compute_content_md5 *)&client->compute_content_md5) =
373         client_config->compute_content_md5;
374 
375     /* Determine how many vips are ideal by dividing target-throughput by throughput-per-vip. */
376     {
377         double ideal_vip_count_double = client->throughput_target_gbps / s_throughput_per_vip_gbps;
378         *((uint32_t *)&client->ideal_vip_count) = (uint32_t)ceil(ideal_vip_count_double);
379     }
380 
381     if (client_config->signing_config) {
382         client->cached_signing_config = aws_cached_signing_config_new(client->allocator, client_config->signing_config);
383     }
384 
385     client->synced_data.active = true;
386 
387     if (client_config->retry_strategy != NULL) {
388         aws_retry_strategy_acquire(client_config->retry_strategy);
389         client->retry_strategy = client_config->retry_strategy;
390     } else {
391         struct aws_exponential_backoff_retry_options backoff_retry_options = {
392             .el_group = client_config->client_bootstrap->event_loop_group,
393             .max_retries = s_default_max_retries,
394         };
395 
396         struct aws_standard_retry_options retry_options = {
397             .backoff_retry_options = backoff_retry_options,
398         };
399 
400         client->retry_strategy = aws_retry_strategy_new_standard(allocator, &retry_options);
401     }
402 
403     aws_hash_table_init(
404         &client->synced_data.endpoints,
405         client->allocator,
406         10,
407         aws_hash_string,
408         aws_hash_callback_string_eq,
409         aws_hash_callback_string_destroy,
410         NULL);
411 
412     /* Initialize shutdown options and tracking. */
413     client->shutdown_callback = client_config->shutdown_callback;
414     client->shutdown_callback_user_data = client_config->shutdown_callback_user_data;
415 
416     return client;
417 
418 on_error:
419     aws_event_loop_group_release(client->body_streaming_elg);
420     client->body_streaming_elg = NULL;
421     if (client->tls_connection_options) {
422         aws_tls_connection_options_clean_up(client->tls_connection_options);
423         aws_mem_release(client->allocator, client->tls_connection_options);
424         client->tls_connection_options = NULL;
425     }
426 elg_create_fail:
427     aws_event_loop_group_release(client->client_bootstrap->event_loop_group);
428     aws_client_bootstrap_release(client->client_bootstrap);
429     aws_mutex_clean_up(&client->synced_data.lock);
430 lock_init_fail:
431     aws_mem_release(client->allocator, client);
432     return NULL;
433 }
434 
aws_s3_client_acquire(struct aws_s3_client * client)435 void aws_s3_client_acquire(struct aws_s3_client *client) {
436     AWS_PRECONDITION(client);
437 
438     aws_ref_count_acquire(&client->ref_count);
439 }
440 
aws_s3_client_release(struct aws_s3_client * client)441 void aws_s3_client_release(struct aws_s3_client *client) {
442     if (client == NULL) {
443         return;
444     }
445 
446     aws_ref_count_release(&client->ref_count);
447 }
448 
s_s3_client_start_destroy(void * user_data)449 static void s_s3_client_start_destroy(void *user_data) {
450     struct aws_s3_client *client = user_data;
451     AWS_PRECONDITION(client);
452 
453     AWS_LOGF_DEBUG(AWS_LS_S3_CLIENT, "id=%p Client starting destruction.", (void *)client);
454 
455     struct aws_linked_list local_vip_list;
456     aws_linked_list_init(&local_vip_list);
457 
458     aws_s3_client_lock_synced_data(client);
459 
460     client->synced_data.active = false;
461 
462     /* Prevent the client from cleaning up inbetween the mutex unlock/re-lock below.*/
463     client->synced_data.start_destroy_executing = true;
464 
465     aws_s3_client_unlock_synced_data(client);
466 
467     aws_event_loop_group_release(client->body_streaming_elg);
468     client->body_streaming_elg = NULL;
469 
470     aws_s3_client_lock_synced_data(client);
471     client->synced_data.start_destroy_executing = false;
472 
473     /* Schedule the work task to clean up outstanding connections and to call s_s3_client_finish_destroy function if
474      * everything cleaning up asynchronously has finished.  */
475     s_s3_client_schedule_process_work_synced(client);
476     aws_s3_client_unlock_synced_data(client);
477 }
478 
s_s3_client_finish_destroy_default(struct aws_s3_client * client)479 static void s_s3_client_finish_destroy_default(struct aws_s3_client *client) {
480     AWS_PRECONDITION(client);
481 
482     AWS_LOGF_DEBUG(AWS_LS_S3_CLIENT, "id=%p Client finishing destruction.", (void *)client);
483 
484     aws_string_destroy(client->region);
485     client->region = NULL;
486 
487     if (client->tls_connection_options) {
488         aws_tls_connection_options_clean_up(client->tls_connection_options);
489         aws_mem_release(client->allocator, client->tls_connection_options);
490         client->tls_connection_options = NULL;
491     }
492 
493     aws_mutex_clean_up(&client->synced_data.lock);
494 
495     AWS_ASSERT(aws_linked_list_empty(&client->synced_data.pending_meta_request_work));
496     AWS_ASSERT(aws_linked_list_empty(&client->threaded_data.meta_requests));
497     aws_hash_table_clean_up(&client->synced_data.endpoints);
498 
499     aws_retry_strategy_release(client->retry_strategy);
500 
501     aws_event_loop_group_release(client->client_bootstrap->event_loop_group);
502 
503     aws_client_bootstrap_release(client->client_bootstrap);
504     aws_cached_signing_config_destroy(client->cached_signing_config);
505 
506     aws_s3_client_shutdown_complete_callback_fn *shutdown_callback = client->shutdown_callback;
507     void *shutdown_user_data = client->shutdown_callback_user_data;
508 
509     aws_mem_release(client->allocator, client);
510     client = NULL;
511 
512     if (shutdown_callback != NULL) {
513         shutdown_callback(shutdown_user_data);
514     }
515 }
516 
s_s3_client_body_streaming_elg_shutdown(void * user_data)517 static void s_s3_client_body_streaming_elg_shutdown(void *user_data) {
518     struct aws_s3_client *client = user_data;
519     AWS_PRECONDITION(client);
520 
521     AWS_LOGF_DEBUG(AWS_LS_S3_CLIENT, "id=%p Client body streaming ELG shutdown.", (void *)client);
522 
523     aws_s3_client_lock_synced_data(client);
524     client->synced_data.body_streaming_elg_allocated = false;
525     s_s3_client_schedule_process_work_synced(client);
526     aws_s3_client_unlock_synced_data(client);
527 }
528 
aws_s3_client_queue_requests_threaded(struct aws_s3_client * client,struct aws_linked_list * request_list,bool queue_front)529 uint32_t aws_s3_client_queue_requests_threaded(
530     struct aws_s3_client *client,
531     struct aws_linked_list *request_list,
532     bool queue_front) {
533     AWS_PRECONDITION(client);
534     AWS_PRECONDITION(request_list);
535 
536     uint32_t request_list_size = 0;
537 
538     for (struct aws_linked_list_node *node = aws_linked_list_begin(request_list);
539          node != aws_linked_list_end(request_list);
540          node = aws_linked_list_next(node)) {
541         ++request_list_size;
542     }
543 
544     if (queue_front) {
545         aws_linked_list_move_all_front(&client->threaded_data.request_queue, request_list);
546     } else {
547         aws_linked_list_move_all_back(&client->threaded_data.request_queue, request_list);
548     }
549 
550     client->threaded_data.request_queue_size += request_list_size;
551     return request_list_size;
552 }
553 
aws_s3_client_dequeue_request_threaded(struct aws_s3_client * client)554 struct aws_s3_request *aws_s3_client_dequeue_request_threaded(struct aws_s3_client *client) {
555     AWS_PRECONDITION(client);
556 
557     if (aws_linked_list_empty(&client->threaded_data.request_queue)) {
558         return NULL;
559     }
560 
561     struct aws_linked_list_node *request_node = aws_linked_list_pop_front(&client->threaded_data.request_queue);
562     struct aws_s3_request *request = AWS_CONTAINER_OF(request_node, struct aws_s3_request, node);
563 
564     --client->threaded_data.request_queue_size;
565 
566     return request;
567 }
568 
569 /* Public facing make-meta-request function. */
aws_s3_client_make_meta_request(struct aws_s3_client * client,const struct aws_s3_meta_request_options * options)570 struct aws_s3_meta_request *aws_s3_client_make_meta_request(
571     struct aws_s3_client *client,
572     const struct aws_s3_meta_request_options *options) {
573 
574     AWS_LOGF_INFO(AWS_LS_S3_CLIENT, "id=%p Initiating making of meta request", (void *)client);
575 
576     AWS_PRECONDITION(client);
577     AWS_PRECONDITION(client->vtable);
578     AWS_PRECONDITION(client->vtable->meta_request_factory);
579     AWS_PRECONDITION(options);
580 
581     if (options->type != AWS_S3_META_REQUEST_TYPE_DEFAULT && options->type != AWS_S3_META_REQUEST_TYPE_GET_OBJECT &&
582         options->type != AWS_S3_META_REQUEST_TYPE_PUT_OBJECT) {
583         AWS_LOGF_ERROR(
584             AWS_LS_S3_CLIENT,
585             "id=%p Cannot create meta s3 request; invalid meta request type specified.",
586             (void *)client);
587         aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
588         return NULL;
589     }
590 
591     if (options->message == NULL) {
592         AWS_LOGF_ERROR(
593             AWS_LS_S3_CLIENT,
594             "id=%p Cannot create meta s3 request; message provided in options is invalid.",
595             (void *)client);
596         aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
597         return NULL;
598     }
599 
600     struct aws_http_headers *message_headers = aws_http_message_get_headers(options->message);
601 
602     if (message_headers == NULL) {
603         AWS_LOGF_ERROR(
604             AWS_LS_S3_CLIENT,
605             "id=%p Cannot create meta s3 request; message provided in options does not contain headers.",
606             (void *)client);
607         aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
608         return NULL;
609     }
610 
611     struct aws_byte_cursor host_header_value;
612 
613     if (aws_http_headers_get(message_headers, g_host_header_name, &host_header_value)) {
614         AWS_LOGF_ERROR(
615             AWS_LS_S3_CLIENT,
616             "id=%p Cannot create meta s3 request; message provided in options does not have a 'Host' header.",
617             (void *)client);
618         aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
619         return NULL;
620     }
621 
622     struct aws_s3_meta_request *meta_request = client->vtable->meta_request_factory(client, options);
623 
624     if (meta_request == NULL) {
625         AWS_LOGF_ERROR(AWS_LS_S3_CLIENT, "id=%p: Could not create new meta request.", (void *)client);
626         return NULL;
627     }
628 
629     bool error_occurred = false;
630 
631     aws_s3_client_lock_synced_data(client);
632 
633     struct aws_string *endpoint_host_name = aws_string_new_from_cursor(client->allocator, &host_header_value);
634 
635     struct aws_s3_endpoint *endpoint = NULL;
636     struct aws_hash_element *endpoint_hash_element = NULL;
637 
638     int was_created = 0;
639 
640     if (aws_hash_table_create(
641             &client->synced_data.endpoints, endpoint_host_name, &endpoint_hash_element, &was_created)) {
642         error_occurred = true;
643         goto unlock;
644     }
645 
646     if (was_created) {
647         struct aws_s3_endpoint_options endpoint_options = {
648             .host_name = endpoint_host_name,
649             .ref_count_zero_callback = client->vtable->endpoint_ref_count_zero,
650             .shutdown_callback = client->vtable->endpoint_shutdown_callback,
651             .client_bootstrap = client->client_bootstrap,
652             .tls_connection_options = client->tls_connection_options,
653             .dns_host_address_ttl_seconds = s_dns_host_address_ttl_seconds,
654             .user_data = client,
655             .max_connections = aws_s3_client_get_max_active_connections(client, NULL),
656         };
657 
658         endpoint = aws_s3_endpoint_new(client->allocator, &endpoint_options);
659 
660         if (endpoint == NULL) {
661             aws_hash_table_remove(&client->synced_data.endpoints, endpoint_host_name, NULL, NULL);
662             error_occurred = true;
663             goto unlock;
664         }
665 
666         endpoint_hash_element->value = endpoint;
667         ++client->synced_data.num_endpoints_allocated;
668     } else {
669         endpoint = aws_s3_endpoint_acquire(endpoint_hash_element->value);
670 
671         aws_string_destroy(endpoint_host_name);
672         endpoint_host_name = NULL;
673     }
674 
675     meta_request->endpoint = endpoint;
676 
677     s_s3_client_push_meta_request_synced(client, meta_request);
678     s_s3_client_schedule_process_work_synced(client);
679 
680 unlock:
681     aws_s3_client_unlock_synced_data(client);
682 
683     if (error_occurred) {
684         AWS_LOGF_ERROR(
685             AWS_LS_S3_CLIENT,
686             "id=%p Could not create meta request due to error %d (%s)",
687             (void *)client,
688             aws_last_error(),
689             aws_error_str(aws_last_error()));
690 
691         aws_s3_meta_request_release(meta_request);
692         meta_request = NULL;
693     } else {
694         AWS_LOGF_INFO(AWS_LS_S3_CLIENT, "id=%p: Created meta request %p", (void *)client, (void *)meta_request);
695     }
696 
697     return meta_request;
698 }
699 
s_s3_client_endpoint_ref_count_zero(struct aws_s3_endpoint * endpoint)700 static bool s_s3_client_endpoint_ref_count_zero(struct aws_s3_endpoint *endpoint) {
701     AWS_PRECONDITION(endpoint);
702 
703     struct aws_s3_client *client = endpoint->user_data;
704     AWS_PRECONDITION(client);
705 
706     bool clean_up_endpoint = false;
707 
708     aws_s3_client_lock_synced_data(client);
709 
710     /* It is possible that before we were able to acquire the lock here, the critical section used for looking up the
711      * endpoint in the table and assigning it to a new meta request was called in a different thread. To handle this
712      * case, we check the ref count before removing it.*/
713     if (aws_atomic_load_int(&endpoint->ref_count.ref_count) == 0) {
714         aws_hash_table_remove(&client->synced_data.endpoints, endpoint->host_name, NULL, NULL);
715         clean_up_endpoint = true;
716     }
717 
718     aws_s3_client_unlock_synced_data(client);
719 
720     return clean_up_endpoint;
721 }
722 
s_s3_client_endpoint_shutdown_callback(void * user_data)723 static void s_s3_client_endpoint_shutdown_callback(void *user_data) {
724     struct aws_s3_client *client = user_data;
725     AWS_PRECONDITION(client);
726 
727     aws_s3_client_lock_synced_data(client);
728     --client->synced_data.num_endpoints_allocated;
729     s_s3_client_schedule_process_work_synced(client);
730     aws_s3_client_unlock_synced_data(client);
731 }
732 
s_s3_client_meta_request_factory_default(struct aws_s3_client * client,const struct aws_s3_meta_request_options * options)733 static struct aws_s3_meta_request *s_s3_client_meta_request_factory_default(
734     struct aws_s3_client *client,
735     const struct aws_s3_meta_request_options *options) {
736     AWS_PRECONDITION(client);
737     AWS_PRECONDITION(options);
738 
739     struct aws_http_headers *initial_message_headers = aws_http_message_get_headers(options->message);
740     AWS_ASSERT(initial_message_headers);
741 
742     uint64_t content_length = 0;
743     struct aws_byte_cursor content_length_cursor;
744     bool content_length_header_found = false;
745 
746     if (!aws_http_headers_get(initial_message_headers, g_content_length_header_name, &content_length_cursor)) {
747         struct aws_string *content_length_str = aws_string_new_from_cursor(client->allocator, &content_length_cursor);
748         char *content_length_str_end = NULL;
749 
750         content_length = strtoull((const char *)content_length_str->bytes, &content_length_str_end, 10);
751         aws_string_destroy(content_length_str);
752 
753         content_length_str = NULL;
754         content_length_header_found = true;
755     }
756 
757     /* Call the appropriate meta-request new function. */
758     if (options->type == AWS_S3_META_REQUEST_TYPE_GET_OBJECT) {
759 
760         /* If the initial request already has partNumber, the request is not splittable(?). Treat it as a Default
761          * request.
762          * TODO: Still need tests to verify that the request of a part is splittable or not */
763         if (aws_http_headers_has(initial_message_headers, aws_byte_cursor_from_c_str("partNumber"))) {
764             return aws_s3_meta_request_default_new(client->allocator, client, content_length, false, options);
765         }
766 
767         return aws_s3_meta_request_auto_ranged_get_new(client->allocator, client, client->part_size, options);
768     } else if (options->type == AWS_S3_META_REQUEST_TYPE_PUT_OBJECT) {
769 
770         if (!content_length_header_found) {
771             AWS_LOGF_ERROR(
772                 AWS_LS_S3_META_REQUEST,
773                 "Could not create auto-ranged-put meta request; there is no Content-Length header present.");
774             aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
775             return NULL;
776         }
777 
778         struct aws_input_stream *input_stream = aws_http_message_get_body_stream(options->message);
779 
780         if (input_stream == NULL) {
781             AWS_LOGF_ERROR(
782                 AWS_LS_S3_META_REQUEST, "Could not create auto-ranged-put meta request; body stream is NULL.");
783             aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
784             return NULL;
785         }
786 
787         size_t client_part_size = client->part_size;
788         size_t client_max_part_size = client->max_part_size;
789 
790         if (client_part_size < g_s3_min_upload_part_size) {
791             AWS_LOGF_WARN(
792                 AWS_LS_S3_META_REQUEST,
793                 "Client config part size of %" PRIu64 " is less than the minimum upload part size of %" PRIu64
794                 ". Using to the minimum part-size for upload.",
795                 (uint64_t)client_part_size,
796                 (uint64_t)g_s3_min_upload_part_size);
797 
798             client_part_size = g_s3_min_upload_part_size;
799         }
800 
801         if (client_max_part_size < g_s3_min_upload_part_size) {
802             AWS_LOGF_WARN(
803                 AWS_LS_S3_META_REQUEST,
804                 "Client config max part size of %" PRIu64 " is less than the minimum upload part size of %" PRIu64
805                 ". Clamping to the minimum part-size for upload.",
806                 (uint64_t)client_max_part_size,
807                 (uint64_t)g_s3_min_upload_part_size);
808 
809             client_max_part_size = g_s3_min_upload_part_size;
810         }
811 
812         if (content_length < client_part_size) {
813             return aws_s3_meta_request_default_new(
814                 client->allocator,
815                 client,
816                 content_length,
817                 client->compute_content_md5 == AWS_MR_CONTENT_MD5_ENABLED &&
818                     !aws_http_headers_has(initial_message_headers, g_content_md5_header_name),
819                 options);
820         }
821 
822         uint64_t part_size_uint64 = content_length / (uint64_t)g_s3_max_num_upload_parts;
823 
824         if (part_size_uint64 > SIZE_MAX) {
825             AWS_LOGF_ERROR(
826                 AWS_LS_S3_META_REQUEST,
827                 "Could not create auto-ranged-put meta request; required part size of %" PRIu64
828                 " bytes is too large for platform.",
829                 part_size_uint64);
830 
831             aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
832             return NULL;
833         }
834 
835         size_t part_size = (size_t)part_size_uint64;
836 
837         if (part_size > client_max_part_size) {
838             AWS_LOGF_ERROR(
839                 AWS_LS_S3_META_REQUEST,
840                 "Could not create auto-ranged-put meta request; required part size for put request is %" PRIu64
841                 ", but current maximum part size is %" PRIu64,
842                 (uint64_t)part_size,
843                 (uint64_t)client_max_part_size);
844             aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
845             return NULL;
846         }
847 
848         if (part_size < client_part_size) {
849             part_size = client_part_size;
850         }
851 
852         uint32_t num_parts = (uint32_t)(content_length / part_size);
853 
854         if ((content_length % part_size) > 0) {
855             ++num_parts;
856         }
857 
858         return aws_s3_meta_request_auto_ranged_put_new(
859             client->allocator, client, part_size, content_length, num_parts, options);
860     } else if (options->type == AWS_S3_META_REQUEST_TYPE_DEFAULT) {
861         return aws_s3_meta_request_default_new(client->allocator, client, content_length, false, options);
862     } else {
863         AWS_FATAL_ASSERT(false);
864     }
865 
866     return NULL;
867 }
868 
s_s3_client_push_meta_request_synced(struct aws_s3_client * client,struct aws_s3_meta_request * meta_request)869 static void s_s3_client_push_meta_request_synced(
870     struct aws_s3_client *client,
871     struct aws_s3_meta_request *meta_request) {
872     AWS_PRECONDITION(client);
873     AWS_PRECONDITION(meta_request);
874     ASSERT_SYNCED_DATA_LOCK_HELD(client);
875 
876     struct aws_s3_meta_request_work *meta_request_work =
877         aws_mem_calloc(client->allocator, 1, sizeof(struct aws_s3_meta_request_work));
878 
879     aws_s3_meta_request_acquire(meta_request);
880     meta_request_work->meta_request = meta_request;
881     aws_linked_list_push_back(&client->synced_data.pending_meta_request_work, &meta_request_work->node);
882 }
883 
s_s3_client_schedule_process_work_synced(struct aws_s3_client * client)884 static void s_s3_client_schedule_process_work_synced(struct aws_s3_client *client) {
885     AWS_PRECONDITION(client);
886     AWS_PRECONDITION(client->vtable);
887     AWS_PRECONDITION(client->vtable->schedule_process_work_synced);
888 
889     ASSERT_SYNCED_DATA_LOCK_HELD(client);
890 
891     client->vtable->schedule_process_work_synced(client);
892 }
893 
s_s3_client_schedule_process_work_synced_default(struct aws_s3_client * client)894 static void s_s3_client_schedule_process_work_synced_default(struct aws_s3_client *client) {
895     ASSERT_SYNCED_DATA_LOCK_HELD(client);
896 
897     if (client->synced_data.process_work_task_scheduled) {
898         return;
899     }
900 
901     aws_task_init(
902         &client->synced_data.process_work_task, s_s3_client_process_work_task, client, "s3_client_process_work_task");
903 
904     aws_event_loop_schedule_task_now(client->process_work_event_loop, &client->synced_data.process_work_task);
905 
906     client->synced_data.process_work_task_scheduled = true;
907 }
908 
aws_s3_client_schedule_process_work(struct aws_s3_client * client)909 void aws_s3_client_schedule_process_work(struct aws_s3_client *client) {
910     AWS_PRECONDITION(client);
911 
912     aws_s3_client_lock_synced_data(client);
913     s_s3_client_schedule_process_work_synced(client);
914     aws_s3_client_unlock_synced_data(client);
915 }
916 
s_s3_client_remove_meta_request_threaded(struct aws_s3_client * client,struct aws_s3_meta_request * meta_request)917 static void s_s3_client_remove_meta_request_threaded(
918     struct aws_s3_client *client,
919     struct aws_s3_meta_request *meta_request) {
920     AWS_PRECONDITION(client);
921     AWS_PRECONDITION(meta_request);
922     (void)client;
923 
924     aws_linked_list_remove(&meta_request->client_process_work_threaded_data.node);
925     meta_request->client_process_work_threaded_data.scheduled = false;
926     aws_s3_meta_request_release(meta_request);
927 }
928 
929 /* Task function for trying to find a request that can be processed. */
s_s3_client_process_work_task(struct aws_task * task,void * arg,enum aws_task_status task_status)930 static void s_s3_client_process_work_task(struct aws_task *task, void *arg, enum aws_task_status task_status) {
931     AWS_PRECONDITION(task);
932     (void)task;
933     (void)task_status;
934 
935     /* Client keeps a reference to the event loop group; a 'canceled' status should not happen.*/
936     AWS_ASSERT(task_status == AWS_TASK_STATUS_RUN_READY);
937 
938     struct aws_s3_client *client = arg;
939     AWS_PRECONDITION(client);
940     AWS_PRECONDITION(client->vtable);
941     AWS_PRECONDITION(client->vtable->process_work);
942 
943     client->vtable->process_work(client);
944 }
945 
s_s3_client_process_work_default(struct aws_s3_client * client)946 static void s_s3_client_process_work_default(struct aws_s3_client *client) {
947     AWS_PRECONDITION(client);
948     AWS_PRECONDITION(client->vtable);
949     AWS_PRECONDITION(client->vtable->finish_destroy);
950 
951     struct aws_linked_list meta_request_work_list;
952     aws_linked_list_init(&meta_request_work_list);
953 
954     /*******************/
955     /* Step 1: Move relevant data into thread local memory. */
956     /*******************/
957     AWS_LOGF_DEBUG(
958         AWS_LS_S3_CLIENT,
959         "id=%p s_s3_client_process_work_default - Moving relevant synced_data into threaded_data.",
960         (void *)client);
961     aws_s3_client_lock_synced_data(client);
962 
963     /* Once we exit this mutex, someone can reschedule this task. */
964     client->synced_data.process_work_task_scheduled = false;
965     client->synced_data.process_work_task_in_progress = true;
966 
967     aws_linked_list_swap_contents(&meta_request_work_list, &client->synced_data.pending_meta_request_work);
968 
969     uint32_t num_requests_queued =
970         aws_s3_client_queue_requests_threaded(client, &client->synced_data.prepared_requests, false);
971 
972     {
973         int sub_result = aws_sub_u32_checked(
974             client->threaded_data.num_requests_being_prepared,
975             num_requests_queued,
976             &client->threaded_data.num_requests_being_prepared);
977 
978         AWS_ASSERT(sub_result == AWS_OP_SUCCESS);
979         (void)sub_result;
980     }
981 
982     {
983         int sub_result = aws_sub_u32_checked(
984             client->threaded_data.num_requests_being_prepared,
985             client->synced_data.num_failed_prepare_requests,
986             &client->threaded_data.num_requests_being_prepared);
987 
988         client->synced_data.num_failed_prepare_requests = 0;
989 
990         AWS_ASSERT(sub_result == AWS_OP_SUCCESS);
991         (void)sub_result;
992     }
993 
994     uint32_t num_endpoints_in_table = (uint32_t)aws_hash_table_get_entry_count(&client->synced_data.endpoints);
995     uint32_t num_endpoints_allocated = client->synced_data.num_endpoints_allocated;
996 
997     aws_s3_client_unlock_synced_data(client);
998 
999     /*******************/
1000     /* Step 2: Push meta requests into the thread local list if they haven't already been scheduled. */
1001     /*******************/
1002     AWS_LOGF_DEBUG(
1003         AWS_LS_S3_CLIENT, "id=%p s_s3_client_process_work_default - Processing any new meta requests.", (void *)client);
1004 
1005     while (!aws_linked_list_empty(&meta_request_work_list)) {
1006         struct aws_linked_list_node *node = aws_linked_list_pop_back(&meta_request_work_list);
1007         struct aws_s3_meta_request_work *meta_request_work =
1008             AWS_CONTAINER_OF(node, struct aws_s3_meta_request_work, node);
1009 
1010         AWS_FATAL_ASSERT(meta_request_work != NULL);
1011         AWS_FATAL_ASSERT(meta_request_work->meta_request != NULL);
1012 
1013         struct aws_s3_meta_request *meta_request = meta_request_work->meta_request;
1014 
1015         if (!meta_request->client_process_work_threaded_data.scheduled) {
1016             aws_linked_list_push_back(
1017                 &client->threaded_data.meta_requests, &meta_request->client_process_work_threaded_data.node);
1018 
1019             meta_request->client_process_work_threaded_data.scheduled = true;
1020         } else {
1021             aws_s3_meta_request_release(meta_request);
1022             meta_request = NULL;
1023         }
1024 
1025         aws_mem_release(client->allocator, meta_request_work);
1026     }
1027 
1028     /*******************/
1029     /* Step 3: Update relevant meta requests and connections. */
1030     /*******************/
1031     {
1032         AWS_LOGF_DEBUG(AWS_LS_S3_CLIENT, "id=%p Updating meta requests.", (void *)client);
1033         aws_s3_client_update_meta_requests_threaded(client);
1034 
1035         AWS_LOGF_DEBUG(
1036             AWS_LS_S3_CLIENT, "id=%p Updating connections, assigning requests where possible.", (void *)client);
1037         aws_s3_client_update_connections_threaded(client);
1038     }
1039 
1040     /*******************/
1041     /* Step 4: Log client stats. */
1042     /*******************/
1043     {
1044         uint32_t num_requests_tracked_requests = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_in_flight);
1045 
1046         uint32_t num_auto_ranged_get_network_io =
1047             s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_GET_OBJECT);
1048         uint32_t num_auto_ranged_put_network_io =
1049             s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_PUT_OBJECT);
1050         uint32_t num_auto_default_network_io =
1051             s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_DEFAULT);
1052 
1053         uint32_t num_requests_network_io =
1054             s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_MAX);
1055 
1056         uint32_t num_requests_stream_queued_waiting =
1057             (uint32_t)aws_atomic_load_int(&client->stats.num_requests_stream_queued_waiting);
1058         uint32_t num_requests_streaming = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_streaming);
1059 
1060         uint32_t total_approx_requests = num_requests_network_io + num_requests_stream_queued_waiting +
1061                                          num_requests_streaming + client->threaded_data.num_requests_being_prepared +
1062                                          client->threaded_data.request_queue_size;
1063         AWS_LOGF(
1064             s_log_level_client_stats,
1065             AWS_LS_S3_CLIENT_STATS,
1066             "id=%p Requests-in-flight(approx/exact):%d/%d  Requests-preparing:%d  Requests-queued:%d  "
1067             "Requests-network(get/put/default/total):%d/%d/%d/%d  Requests-streaming-waiting:%d  Requests-streaming:%d "
1068             " Endpoints(in-table/allocated):%d/%d",
1069             (void *)client,
1070             total_approx_requests,
1071             num_requests_tracked_requests,
1072             client->threaded_data.num_requests_being_prepared,
1073             client->threaded_data.request_queue_size,
1074             num_auto_ranged_get_network_io,
1075             num_auto_ranged_put_network_io,
1076             num_auto_default_network_io,
1077             num_requests_network_io,
1078             num_requests_stream_queued_waiting,
1079             num_requests_streaming,
1080             num_endpoints_in_table,
1081             num_endpoints_allocated);
1082     }
1083 
1084     /*******************/
1085     /* Step 5: Check for client shutdown. */
1086     /*******************/
1087     {
1088         aws_s3_client_lock_synced_data(client);
1089         client->synced_data.process_work_task_in_progress = false;
1090 
1091         /* This flag should never be set twice. If it was, that means a double-free could occur.*/
1092         AWS_ASSERT(!client->synced_data.finish_destroy);
1093 
1094         bool finish_destroy = client->synced_data.active == false &&
1095                               client->synced_data.start_destroy_executing == false &&
1096                               client->synced_data.body_streaming_elg_allocated == false &&
1097                               client->synced_data.process_work_task_scheduled == false &&
1098                               client->synced_data.process_work_task_in_progress == false &&
1099                               client->synced_data.num_endpoints_allocated == 0;
1100 
1101         client->synced_data.finish_destroy = finish_destroy;
1102 
1103         if (!client->synced_data.active) {
1104             AWS_LOGF_DEBUG(
1105                 AWS_LS_S3_CLIENT,
1106                 "id=%p Client shutdown progress: starting_destroy_executing=%d  body_streaming_elg_allocated=%d  "
1107                 "process_work_task_scheduled=%d  process_work_task_in_progress=%d  num_endpoints_allocated=%d "
1108                 "finish_destroy=%d",
1109                 (void *)client,
1110                 (int)client->synced_data.start_destroy_executing,
1111                 (int)client->synced_data.body_streaming_elg_allocated,
1112                 (int)client->synced_data.process_work_task_scheduled,
1113                 (int)client->synced_data.process_work_task_in_progress,
1114                 (int)client->synced_data.num_endpoints_allocated,
1115                 (int)client->synced_data.finish_destroy);
1116         }
1117 
1118         aws_s3_client_unlock_synced_data(client);
1119 
1120         if (finish_destroy) {
1121             client->vtable->finish_destroy(client);
1122         }
1123     }
1124 }
1125 
1126 static void s_s3_client_prepare_callback_queue_request(
1127     struct aws_s3_meta_request *meta_request,
1128     struct aws_s3_request *request,
1129     int error_code,
1130     void *user_data);
1131 
aws_s3_client_update_meta_requests_threaded(struct aws_s3_client * client)1132 void aws_s3_client_update_meta_requests_threaded(struct aws_s3_client *client) {
1133     AWS_PRECONDITION(client);
1134 
1135     const uint32_t max_requests_in_flight = aws_s3_client_get_max_requests_in_flight(client);
1136     const uint32_t max_requests_prepare = aws_s3_client_get_max_requests_prepare(client);
1137 
1138     struct aws_linked_list meta_requests_work_remaining;
1139     aws_linked_list_init(&meta_requests_work_remaining);
1140 
1141     uint32_t num_requests_in_flight = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_in_flight);
1142 
1143     const uint32_t pass_flags[] = {
1144         AWS_S3_META_REQUEST_UPDATE_FLAG_CONSERVATIVE,
1145         0,
1146     };
1147 
1148     const uint32_t num_passes = AWS_ARRAY_SIZE(pass_flags);
1149 
1150     for (uint32_t pass_index = 0; pass_index < num_passes; ++pass_index) {
1151 
1152         /* While:
1153          *     * Number of being-prepared + already-prepared-and-queued requests is less than the max that can be in the
1154          * preparation stage.
1155          *     * Total number of requests tracked by the client is less than the max tracked ("in flight") requests.
1156          *     * There are meta requests to get requests from.
1157          *
1158          * Then update meta requests to get new requests that can then be prepared (reading from any streams, signing,
1159          * etc.) for sending.
1160          */
1161         while ((client->threaded_data.num_requests_being_prepared + client->threaded_data.request_queue_size) <
1162                    max_requests_prepare &&
1163                num_requests_in_flight < max_requests_in_flight &&
1164                !aws_linked_list_empty(&client->threaded_data.meta_requests)) {
1165 
1166             struct aws_linked_list_node *meta_request_node =
1167                 aws_linked_list_begin(&client->threaded_data.meta_requests);
1168             struct aws_s3_meta_request *meta_request =
1169                 AWS_CONTAINER_OF(meta_request_node, struct aws_s3_meta_request, client_process_work_threaded_data);
1170 
1171             struct aws_s3_endpoint *endpoint = meta_request->endpoint;
1172             AWS_ASSERT(endpoint != NULL);
1173 
1174             AWS_ASSERT(client->vtable->get_host_address_count);
1175             size_t num_known_vips = client->vtable->get_host_address_count(
1176                 client->client_bootstrap->host_resolver, endpoint->host_name, AWS_GET_HOST_ADDRESS_COUNT_RECORD_TYPE_A);
1177 
1178             /* If this particular endpoint doesn't have any known addresses yet, then we don't want to go full speed in
1179              * ramping up requests just yet. If there is already enough in the queue for one address (even if those
1180              * aren't for this particular endpoint) we skip over this meta request for now. */
1181             if (num_known_vips == 0 && (client->threaded_data.num_requests_being_prepared +
1182                                         client->threaded_data.request_queue_size) >= g_max_num_connections_per_vip) {
1183                 aws_linked_list_remove(&meta_request->client_process_work_threaded_data.node);
1184                 aws_linked_list_push_back(
1185                     &meta_requests_work_remaining, &meta_request->client_process_work_threaded_data.node);
1186                 continue;
1187             }
1188 
1189             struct aws_s3_request *request = NULL;
1190 
1191             /* Try to grab the next request from the meta request. */
1192             bool work_remaining = aws_s3_meta_request_update(meta_request, pass_flags[pass_index], &request);
1193 
1194             if (work_remaining) {
1195                 /* If there is work remaining, but we didn't get a request back, take the meta request out of the
1196                  * list so that we don't use it again during this function, with the intention of putting it back in
1197                  * the list before this function ends. */
1198                 if (request == NULL) {
1199                     aws_linked_list_remove(&meta_request->client_process_work_threaded_data.node);
1200                     aws_linked_list_push_back(
1201                         &meta_requests_work_remaining, &meta_request->client_process_work_threaded_data.node);
1202                 } else {
1203                     request->tracked_by_client = true;
1204 
1205                     ++client->threaded_data.num_requests_being_prepared;
1206 
1207                     num_requests_in_flight =
1208                         (uint32_t)aws_atomic_fetch_add(&client->stats.num_requests_in_flight, 1) + 1;
1209 
1210                     aws_s3_meta_request_prepare_request(
1211                         meta_request, request, s_s3_client_prepare_callback_queue_request, client);
1212                 }
1213             } else {
1214                 s_s3_client_remove_meta_request_threaded(client, meta_request);
1215             }
1216         }
1217 
1218         aws_linked_list_move_all_front(&client->threaded_data.meta_requests, &meta_requests_work_remaining);
1219     }
1220 }
1221 
s_s3_client_prepare_callback_queue_request(struct aws_s3_meta_request * meta_request,struct aws_s3_request * request,int error_code,void * user_data)1222 static void s_s3_client_prepare_callback_queue_request(
1223     struct aws_s3_meta_request *meta_request,
1224     struct aws_s3_request *request,
1225     int error_code,
1226     void *user_data) {
1227     AWS_PRECONDITION(meta_request);
1228     AWS_PRECONDITION(request);
1229 
1230     struct aws_s3_client *client = user_data;
1231     AWS_PRECONDITION(client);
1232 
1233     if (error_code != AWS_ERROR_SUCCESS) {
1234         aws_s3_meta_request_finished_request(meta_request, request, error_code);
1235 
1236         aws_s3_request_release(request);
1237         request = NULL;
1238     }
1239 
1240     aws_s3_client_lock_synced_data(client);
1241 
1242     if (error_code == AWS_ERROR_SUCCESS) {
1243         aws_linked_list_push_back(&client->synced_data.prepared_requests, &request->node);
1244     } else {
1245         ++client->synced_data.num_failed_prepare_requests;
1246     }
1247 
1248     s_s3_client_schedule_process_work_synced(client);
1249     aws_s3_client_unlock_synced_data(client);
1250 }
1251 
aws_s3_client_update_connections_threaded(struct aws_s3_client * client)1252 void aws_s3_client_update_connections_threaded(struct aws_s3_client *client) {
1253     AWS_PRECONDITION(client);
1254     AWS_PRECONDITION(client->vtable);
1255 
1256     struct aws_linked_list left_over_requests;
1257     aws_linked_list_init(&left_over_requests);
1258 
1259     while (s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_MAX) <
1260                aws_s3_client_get_max_active_connections(client, NULL) &&
1261            !aws_linked_list_empty(&client->threaded_data.request_queue)) {
1262 
1263         struct aws_s3_request *request = aws_s3_client_dequeue_request_threaded(client);
1264         const uint32_t max_active_connections = aws_s3_client_get_max_active_connections(client, request->meta_request);
1265 
1266         /* Unless the request is marked "always send", if this meta request has a finish result, then finish the request
1267          * now and release it. */
1268         if (!request->always_send && aws_s3_meta_request_has_finish_result(request->meta_request)) {
1269             aws_s3_meta_request_finished_request(request->meta_request, request, AWS_ERROR_S3_CANCELED);
1270 
1271             aws_s3_request_release(request);
1272             request = NULL;
1273         } else if (
1274             s_s3_client_get_num_requests_network_io(client, request->meta_request->type) < max_active_connections) {
1275             s_s3_client_create_connection_for_request(client, request);
1276         } else {
1277             /* Push the request into the left-over list to be used in a future call of this function. */
1278             aws_linked_list_push_back(&left_over_requests, &request->node);
1279         }
1280     }
1281 
1282     aws_s3_client_queue_requests_threaded(client, &left_over_requests, true);
1283 }
1284 
1285 static void s_s3_client_acquired_retry_token(
1286     struct aws_retry_strategy *retry_strategy,
1287     int error_code,
1288     struct aws_retry_token *token,
1289     void *user_data);
1290 
1291 static void s_s3_client_retry_ready(struct aws_retry_token *token, int error_code, void *user_data);
1292 
1293 static void s_s3_client_create_connection_for_request_default(
1294     struct aws_s3_client *client,
1295     struct aws_s3_request *request);
1296 
s_s3_client_create_connection_for_request(struct aws_s3_client * client,struct aws_s3_request * request)1297 static void s_s3_client_create_connection_for_request(struct aws_s3_client *client, struct aws_s3_request *request) {
1298     AWS_PRECONDITION(client);
1299     AWS_PRECONDITION(client->vtable);
1300 
1301     if (client->vtable->create_connection_for_request) {
1302         client->vtable->create_connection_for_request(client, request);
1303         return;
1304     }
1305 
1306     s_s3_client_create_connection_for_request_default(client, request);
1307 }
1308 
s_s3_client_create_connection_for_request_default(struct aws_s3_client * client,struct aws_s3_request * request)1309 static void s_s3_client_create_connection_for_request_default(
1310     struct aws_s3_client *client,
1311     struct aws_s3_request *request) {
1312     AWS_PRECONDITION(client);
1313     AWS_PRECONDITION(request);
1314 
1315     struct aws_s3_meta_request *meta_request = request->meta_request;
1316     AWS_PRECONDITION(meta_request);
1317 
1318     aws_atomic_fetch_add(&client->stats.num_requests_network_io[meta_request->type], 1);
1319 
1320     struct aws_s3_connection *connection = aws_mem_calloc(client->allocator, 1, sizeof(struct aws_s3_connection));
1321 
1322     connection->endpoint = aws_s3_endpoint_acquire(meta_request->endpoint);
1323     connection->request = request;
1324 
1325     struct aws_byte_cursor host_header_value;
1326     AWS_ZERO_STRUCT(host_header_value);
1327 
1328     struct aws_http_headers *message_headers = aws_http_message_get_headers(meta_request->initial_request_message);
1329     AWS_ASSERT(message_headers);
1330 
1331     int get_header_result = aws_http_headers_get(message_headers, g_host_header_name, &host_header_value);
1332     AWS_ASSERT(get_header_result == AWS_OP_SUCCESS);
1333     (void)get_header_result;
1334 
1335     if (aws_retry_strategy_acquire_retry_token(
1336             client->retry_strategy, &host_header_value, s_s3_client_acquired_retry_token, connection, 0)) {
1337 
1338         AWS_LOGF_ERROR(
1339             AWS_LS_S3_CLIENT,
1340             "id=%p Client could not acquire retry token for request %p due to error %d (%s)",
1341             (void *)client,
1342             (void *)request,
1343             aws_last_error_or_unknown(),
1344             aws_error_str(aws_last_error_or_unknown()));
1345 
1346         goto reset_connection;
1347     }
1348 
1349     return;
1350 
1351 reset_connection:
1352 
1353     aws_s3_client_notify_connection_finished(
1354         client, connection, aws_last_error_or_unknown(), AWS_S3_CONNECTION_FINISH_CODE_FAILED);
1355 }
1356 
s_s3_client_acquired_retry_token(struct aws_retry_strategy * retry_strategy,int error_code,struct aws_retry_token * token,void * user_data)1357 static void s_s3_client_acquired_retry_token(
1358     struct aws_retry_strategy *retry_strategy,
1359     int error_code,
1360     struct aws_retry_token *token,
1361     void *user_data) {
1362 
1363     AWS_PRECONDITION(retry_strategy);
1364     (void)retry_strategy;
1365 
1366     struct aws_s3_connection *connection = user_data;
1367     AWS_PRECONDITION(connection);
1368 
1369     struct aws_s3_request *request = connection->request;
1370     AWS_PRECONDITION(request);
1371 
1372     struct aws_s3_meta_request *meta_request = request->meta_request;
1373     AWS_PRECONDITION(meta_request);
1374 
1375     struct aws_s3_endpoint *endpoint = meta_request->endpoint;
1376     AWS_ASSERT(endpoint != NULL);
1377 
1378     struct aws_s3_client *client = endpoint->user_data;
1379     AWS_ASSERT(client != NULL);
1380 
1381     if (error_code != AWS_ERROR_SUCCESS) {
1382         AWS_LOGF_ERROR(
1383             AWS_LS_S3_CLIENT,
1384             "id=%p Client could not get retry token for connection %p processing request %p due to error %d (%s)",
1385             (void *)client,
1386             (void *)connection,
1387             (void *)request,
1388             error_code,
1389             aws_error_str(error_code));
1390 
1391         goto error_clean_up;
1392     }
1393 
1394     AWS_ASSERT(token);
1395 
1396     connection->retry_token = token;
1397 
1398     AWS_ASSERT(client->vtable->acquire_http_connection);
1399 
1400     client->vtable->acquire_http_connection(
1401         endpoint->http_connection_manager, s_s3_client_on_acquire_http_connection, connection);
1402 
1403     return;
1404 
1405 error_clean_up:
1406 
1407     aws_s3_client_notify_connection_finished(client, connection, error_code, AWS_S3_CONNECTION_FINISH_CODE_FAILED);
1408 }
1409 
s_s3_client_on_acquire_http_connection(struct aws_http_connection * incoming_http_connection,int error_code,void * user_data)1410 static void s_s3_client_on_acquire_http_connection(
1411     struct aws_http_connection *incoming_http_connection,
1412     int error_code,
1413     void *user_data) {
1414 
1415     struct aws_s3_connection *connection = user_data;
1416     AWS_PRECONDITION(connection);
1417 
1418     struct aws_s3_request *request = connection->request;
1419     AWS_PRECONDITION(request);
1420 
1421     struct aws_s3_meta_request *meta_request = request->meta_request;
1422     AWS_PRECONDITION(meta_request);
1423 
1424     struct aws_s3_endpoint *endpoint = meta_request->endpoint;
1425     AWS_ASSERT(endpoint != NULL);
1426 
1427     struct aws_s3_client *client = endpoint->user_data;
1428     AWS_ASSERT(client != NULL);
1429 
1430     if (error_code != AWS_ERROR_SUCCESS) {
1431         AWS_LOGF_ERROR(
1432             AWS_LS_S3_ENDPOINT,
1433             "id=%p: Could not acquire connection due to error code %d (%s)",
1434             (void *)endpoint,
1435             error_code,
1436             aws_error_str(error_code));
1437 
1438         if (error_code == AWS_IO_DNS_INVALID_NAME) {
1439             goto error_fail;
1440         }
1441 
1442         goto error_retry;
1443     }
1444 
1445     connection->http_connection = incoming_http_connection;
1446     aws_s3_meta_request_send_request(meta_request, connection);
1447     return;
1448 
1449 error_retry:
1450 
1451     aws_s3_client_notify_connection_finished(client, connection, error_code, AWS_S3_CONNECTION_FINISH_CODE_RETRY);
1452     return;
1453 
1454 error_fail:
1455 
1456     aws_s3_client_notify_connection_finished(client, connection, error_code, AWS_S3_CONNECTION_FINISH_CODE_FAILED);
1457 }
1458 
1459 /* Called by aws_s3_meta_request when it has finished using this connection for a single request. */
aws_s3_client_notify_connection_finished(struct aws_s3_client * client,struct aws_s3_connection * connection,int error_code,enum aws_s3_connection_finish_code finish_code)1460 void aws_s3_client_notify_connection_finished(
1461     struct aws_s3_client *client,
1462     struct aws_s3_connection *connection,
1463     int error_code,
1464     enum aws_s3_connection_finish_code finish_code) {
1465     AWS_PRECONDITION(client);
1466     AWS_PRECONDITION(connection);
1467 
1468     struct aws_s3_request *request = connection->request;
1469     AWS_PRECONDITION(request);
1470 
1471     struct aws_s3_meta_request *meta_request = request->meta_request;
1472 
1473     AWS_PRECONDITION(meta_request);
1474     AWS_PRECONDITION(meta_request->initial_request_message);
1475 
1476     struct aws_s3_endpoint *endpoint = meta_request->endpoint;
1477     AWS_PRECONDITION(endpoint);
1478 
1479     /* If we're trying to setup a retry... */
1480     if (finish_code == AWS_S3_CONNECTION_FINISH_CODE_RETRY) {
1481 
1482         if (connection->retry_token == NULL) {
1483             AWS_LOGF_ERROR(
1484                 AWS_LS_S3_CLIENT,
1485                 "id=%p Client could not schedule retry of request %p for meta request %p",
1486                 (void *)client,
1487                 (void *)request,
1488                 (void *)meta_request);
1489 
1490             goto reset_connection;
1491         }
1492 
1493         if (aws_s3_meta_request_is_finished(meta_request)) {
1494             AWS_LOGF_DEBUG(
1495                 AWS_LS_S3_CLIENT,
1496                 "id=%p Client not scheduling retry of request %p for meta request %p with token %p because meta "
1497                 "request has been flagged as finished.",
1498                 (void *)client,
1499                 (void *)request,
1500                 (void *)meta_request,
1501                 (void *)connection->retry_token);
1502 
1503             goto reset_connection;
1504         }
1505 
1506         AWS_LOGF_DEBUG(
1507             AWS_LS_S3_CLIENT,
1508             "id=%p Client scheduling retry of request %p for meta request %p with token %p.",
1509             (void *)client,
1510             (void *)request,
1511             (void *)meta_request,
1512             (void *)connection->retry_token);
1513 
1514         enum aws_retry_error_type error_type = AWS_RETRY_ERROR_TYPE_TRANSIENT;
1515 
1516         switch (error_code) {
1517             case AWS_ERROR_S3_INTERNAL_ERROR:
1518                 error_type = AWS_RETRY_ERROR_TYPE_SERVER_ERROR;
1519                 break;
1520 
1521             case AWS_ERROR_S3_SLOW_DOWN:
1522                 error_type = AWS_RETRY_ERROR_TYPE_THROTTLING;
1523                 break;
1524         }
1525 
1526         if (connection->http_connection != NULL) {
1527             AWS_ASSERT(endpoint->http_connection_manager);
1528 
1529             aws_http_connection_manager_release_connection(
1530                 endpoint->http_connection_manager, connection->http_connection);
1531 
1532             connection->http_connection = NULL;
1533         }
1534 
1535         /* Ask the retry strategy to schedule a retry of the request. */
1536         if (aws_retry_strategy_schedule_retry(
1537                 connection->retry_token, error_type, s_s3_client_retry_ready, connection)) {
1538             error_code = aws_last_error_or_unknown();
1539 
1540             AWS_LOGF_ERROR(
1541                 AWS_LS_S3_CLIENT,
1542                 "id=%p Client could not retry request %p for meta request %p with token %p due to error %d (%s)",
1543                 (void *)client,
1544                 (void *)request,
1545                 (void *)meta_request,
1546                 (void *)connection->retry_token,
1547                 error_code,
1548                 aws_error_str(error_code));
1549 
1550             goto reset_connection;
1551         }
1552 
1553         return;
1554     }
1555 
1556 reset_connection:
1557 
1558     if (connection->retry_token != NULL) {
1559         /* If we have a retry token and successfully finished, record that success. */
1560         if (finish_code == AWS_S3_CONNECTION_FINISH_CODE_SUCCESS) {
1561             aws_retry_token_record_success(connection->retry_token);
1562         }
1563 
1564         aws_retry_token_release(connection->retry_token);
1565         connection->retry_token = NULL;
1566     }
1567 
1568     /* If we weren't successful, and we're here, that means this failure is not eligible for a retry. So finish the
1569      * request, and close our HTTP connection. */
1570     if (finish_code != AWS_S3_CONNECTION_FINISH_CODE_SUCCESS) {
1571         if (connection->http_connection != NULL) {
1572             aws_http_connection_close(connection->http_connection);
1573         }
1574     }
1575 
1576     aws_atomic_fetch_sub(&client->stats.num_requests_network_io[meta_request->type], 1);
1577 
1578     aws_s3_meta_request_finished_request(meta_request, request, error_code);
1579 
1580     if (connection->http_connection != NULL) {
1581         AWS_ASSERT(endpoint->http_connection_manager);
1582 
1583         aws_http_connection_manager_release_connection(endpoint->http_connection_manager, connection->http_connection);
1584 
1585         connection->http_connection = NULL;
1586     }
1587 
1588     if (connection->request != NULL) {
1589         aws_s3_request_release(connection->request);
1590         connection->request = NULL;
1591     }
1592 
1593     aws_retry_token_release(connection->retry_token);
1594     connection->retry_token = NULL;
1595 
1596     aws_s3_endpoint_release(connection->endpoint);
1597     connection->endpoint = NULL;
1598 
1599     aws_mem_release(client->allocator, connection);
1600     connection = NULL;
1601 
1602     aws_s3_client_lock_synced_data(client);
1603     s_s3_client_schedule_process_work_synced(client);
1604     aws_s3_client_unlock_synced_data(client);
1605 }
1606 
1607 static void s_s3_client_prepare_request_callback_retry_request(
1608     struct aws_s3_meta_request *meta_request,
1609     struct aws_s3_request *request,
1610     int error_code,
1611     void *user_data);
1612 
s_s3_client_retry_ready(struct aws_retry_token * token,int error_code,void * user_data)1613 static void s_s3_client_retry_ready(struct aws_retry_token *token, int error_code, void *user_data) {
1614     AWS_PRECONDITION(token);
1615     (void)token;
1616 
1617     struct aws_s3_connection *connection = user_data;
1618     AWS_PRECONDITION(connection);
1619 
1620     struct aws_s3_request *request = connection->request;
1621     AWS_PRECONDITION(request);
1622 
1623     struct aws_s3_meta_request *meta_request = request->meta_request;
1624     AWS_PRECONDITION(meta_request);
1625 
1626     struct aws_s3_endpoint *endpoint = meta_request->endpoint;
1627     AWS_PRECONDITION(endpoint);
1628 
1629     struct aws_s3_client *client = endpoint->user_data;
1630     AWS_PRECONDITION(client);
1631 
1632     /* If we couldn't retry this request, then bail on the entire meta request. */
1633     if (error_code != AWS_ERROR_SUCCESS) {
1634 
1635         AWS_LOGF_ERROR(
1636             AWS_LS_S3_CLIENT,
1637             "id=%p Client could not retry request %p for meta request %p due to error %d (%s)",
1638             (void *)client,
1639             (void *)meta_request,
1640             (void *)request,
1641             error_code,
1642             aws_error_str(error_code));
1643 
1644         goto error_clean_up;
1645     }
1646 
1647     AWS_LOGF_DEBUG(
1648         AWS_LS_S3_META_REQUEST,
1649         "id=%p Client retrying request %p for meta request %p on connection %p with retry token %p",
1650         (void *)client,
1651         (void *)request,
1652         (void *)meta_request,
1653         (void *)connection,
1654         (void *)connection->retry_token);
1655 
1656     aws_s3_meta_request_prepare_request(
1657         meta_request, request, s_s3_client_prepare_request_callback_retry_request, connection);
1658 
1659     return;
1660 
1661 error_clean_up:
1662 
1663     aws_s3_client_notify_connection_finished(client, connection, error_code, AWS_S3_CONNECTION_FINISH_CODE_FAILED);
1664 }
1665 
s_s3_client_prepare_request_callback_retry_request(struct aws_s3_meta_request * meta_request,struct aws_s3_request * request,int error_code,void * user_data)1666 static void s_s3_client_prepare_request_callback_retry_request(
1667     struct aws_s3_meta_request *meta_request,
1668     struct aws_s3_request *request,
1669     int error_code,
1670     void *user_data) {
1671     AWS_PRECONDITION(meta_request);
1672     (void)meta_request;
1673 
1674     AWS_PRECONDITION(request);
1675     (void)request;
1676 
1677     struct aws_s3_connection *connection = user_data;
1678     AWS_PRECONDITION(connection);
1679 
1680     struct aws_s3_endpoint *endpoint = meta_request->endpoint;
1681     AWS_ASSERT(endpoint != NULL);
1682 
1683     struct aws_s3_client *client = endpoint->user_data;
1684     AWS_ASSERT(client != NULL);
1685 
1686     if (error_code == AWS_ERROR_SUCCESS) {
1687         AWS_ASSERT(connection->retry_token);
1688 
1689         s_s3_client_acquired_retry_token(
1690             client->retry_strategy, AWS_ERROR_SUCCESS, connection->retry_token, connection);
1691     } else {
1692         aws_s3_client_notify_connection_finished(client, connection, error_code, AWS_S3_CONNECTION_FINISH_CODE_FAILED);
1693     }
1694 }
1695 
1696 /* Called by aws_s3_request when it has finished being destroyed */
aws_s3_client_notify_request_destroyed(struct aws_s3_client * client,struct aws_s3_request * request)1697 void aws_s3_client_notify_request_destroyed(struct aws_s3_client *client, struct aws_s3_request *request) {
1698     AWS_PRECONDITION(client);
1699     AWS_PRECONDITION(request);
1700 
1701     if (request->tracked_by_client) {
1702         aws_s3_client_lock_synced_data(client);
1703         aws_atomic_fetch_sub(&client->stats.num_requests_in_flight, 1);
1704         s_s3_client_schedule_process_work_synced(client);
1705         aws_s3_client_unlock_synced_data(client);
1706     }
1707 }
1708