1 /**
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0.
4 */
5
6 #include "aws/s3/private/s3_auto_ranged_get.h"
7 #include "aws/s3/private/s3_auto_ranged_put.h"
8 #include "aws/s3/private/s3_client_impl.h"
9 #include "aws/s3/private/s3_default_meta_request.h"
10 #include "aws/s3/private/s3_meta_request_impl.h"
11 #include "aws/s3/private/s3_util.h"
12
13 #include <aws/auth/credentials.h>
14 #include <aws/common/assert.h>
15 #include <aws/common/atomics.h>
16 #include <aws/common/clock.h>
17 #include <aws/common/device_random.h>
18 #include <aws/common/environment.h>
19 #include <aws/common/string.h>
20 #include <aws/common/system_info.h>
21 #include <aws/http/connection.h>
22 #include <aws/http/connection_manager.h>
23 #include <aws/http/proxy.h>
24 #include <aws/http/request_response.h>
25 #include <aws/io/channel_bootstrap.h>
26 #include <aws/io/event_loop.h>
27 #include <aws/io/host_resolver.h>
28 #include <aws/io/retry_strategy.h>
29 #include <aws/io/socket.h>
30 #include <aws/io/stream.h>
31 #include <aws/io/tls_channel_handler.h>
32 #include <aws/io/uri.h>
33
34 #include <inttypes.h>
35 #include <math.h>
36
37 #if _MSC_VER
38 # pragma warning(disable : 4232) /* function pointer to dll symbol */
39 #endif /* _MSC_VER */
40
41 struct aws_s3_meta_request_work {
42 struct aws_linked_list_node node;
43 struct aws_s3_meta_request *meta_request;
44 };
45
46 static const enum aws_log_level s_log_level_client_stats = AWS_LL_INFO;
47
48 static const uint32_t s_max_requests_multiplier = 4;
49
50 /* TODO Provide analysis on origins of this value. */
51 static const double s_throughput_per_vip_gbps = 4.0;
52
53 /* Preferred amount of active connections per meta request type. */
54 const uint32_t g_num_conns_per_vip_meta_request_look_up[AWS_S3_META_REQUEST_TYPE_MAX] = {
55 10, /* AWS_S3_META_REQUEST_TYPE_DEFAULT */
56 10, /* AWS_S3_META_REQUEST_TYPE_GET_OBJECT */
57 10, /* AWS_S3_META_REQUEST_TYPE_PUT_OBJECT */
58 };
59
60 /* Should be max of s_num_conns_per_vip_meta_request_look_up */
61 const uint32_t g_max_num_connections_per_vip = 10;
62
63 /**
64 * Default part size is 8 MB to reach the best performance from the experiments we had.
65 * Default max part size is SIZE_MAX at 32bit build, which is around 4GB, which is 5GB at 64bit build.
66 * The server limit is 5GB, but object size limit is 5TB for now. We should be good enough for all the case.
67 * For upload, the max number of parts is 10000, which will limits the object size to 40TB for 32bit and 50TB for
68 * 64bit.
69 * TODO Provide more information on other values.
70 */
71 static const size_t s_default_part_size = 8 * 1024 * 1024;
72 static const size_t s_default_max_part_size = SIZE_MAX < 5000000000000ULL ? SIZE_MAX : 5000000000000ULL;
73 static const double s_default_throughput_target_gbps = 10.0;
74 static const uint32_t s_default_max_retries = 5;
75 static size_t s_dns_host_address_ttl_seconds = 5 * 60;
76
77 /* Called when ref count is 0. */
78 static void s_s3_client_start_destroy(void *user_data);
79
80 /* Called by s_s3_client_process_work_default when all shutdown criteria has been met. */
81 static void s_s3_client_finish_destroy_default(struct aws_s3_client *client);
82
83 /* Called when the body streaming elg shutdown has completed. */
84 static void s_s3_client_body_streaming_elg_shutdown(void *user_data);
85
86 static void s_s3_client_create_connection_for_request(struct aws_s3_client *client, struct aws_s3_request *request);
87
88 /* Callback which handles the HTTP connection retrieved by acquire_http_connection. */
89 static void s_s3_client_on_acquire_http_connection(
90 struct aws_http_connection *http_connection,
91 int error_code,
92 void *user_data);
93
94 static void s_s3_client_push_meta_request_synced(
95 struct aws_s3_client *client,
96 struct aws_s3_meta_request *meta_request);
97
98 /* Schedule task for processing work. (Calls the corresponding vtable function.) */
99 static void s_s3_client_schedule_process_work_synced(struct aws_s3_client *client);
100
101 /* Default implementation for scheduling processing of work. */
102 static void s_s3_client_schedule_process_work_synced_default(struct aws_s3_client *client);
103
104 /* Actual task function that processes work. */
105 static void s_s3_client_process_work_task(struct aws_task *task, void *arg, enum aws_task_status task_status);
106
107 static void s_s3_client_process_work_default(struct aws_s3_client *client);
108
109 static bool s_s3_client_endpoint_ref_count_zero(struct aws_s3_endpoint *endpoint);
110
111 static void s_s3_client_endpoint_shutdown_callback(void *user_data);
112
113 /* Default factory function for creating a meta request. */
114 static struct aws_s3_meta_request *s_s3_client_meta_request_factory_default(
115 struct aws_s3_client *client,
116 const struct aws_s3_meta_request_options *options);
117
118 static struct aws_s3_client_vtable s_s3_client_default_vtable = {
119 .meta_request_factory = s_s3_client_meta_request_factory_default,
120 .acquire_http_connection = aws_http_connection_manager_acquire_connection,
121 .get_host_address_count = aws_host_resolver_get_host_address_count,
122 .schedule_process_work_synced = s_s3_client_schedule_process_work_synced_default,
123 .process_work = s_s3_client_process_work_default,
124 .endpoint_ref_count_zero = s_s3_client_endpoint_ref_count_zero,
125 .endpoint_shutdown_callback = s_s3_client_endpoint_shutdown_callback,
126 .finish_destroy = s_s3_client_finish_destroy_default,
127 };
128
aws_s3_set_dns_ttl(size_t ttl)129 void aws_s3_set_dns_ttl(size_t ttl) {
130 s_dns_host_address_ttl_seconds = ttl;
131 }
132
133 /* Returns the max number of connections allowed.
134 *
135 * When meta request is NULL, this will return the overall allowed number of connections.
136 *
137 * If meta_request is not NULL, this will give the max number of connections allowed for that meta request type on
138 * thatendpoint.
139 */
aws_s3_client_get_max_active_connections(struct aws_s3_client * client,struct aws_s3_meta_request * meta_request)140 uint32_t aws_s3_client_get_max_active_connections(
141 struct aws_s3_client *client,
142 struct aws_s3_meta_request *meta_request) {
143 AWS_PRECONDITION(client);
144
145 uint32_t num_connections_per_vip = g_max_num_connections_per_vip;
146 uint32_t num_vips = client->ideal_vip_count;
147
148 if (meta_request != NULL) {
149 num_connections_per_vip = g_num_conns_per_vip_meta_request_look_up[meta_request->type];
150
151 struct aws_s3_endpoint *endpoint = meta_request->endpoint;
152 AWS_ASSERT(endpoint != NULL);
153
154 AWS_ASSERT(client->vtable->get_host_address_count);
155 size_t num_known_vips = client->vtable->get_host_address_count(
156 client->client_bootstrap->host_resolver, endpoint->host_name, AWS_GET_HOST_ADDRESS_COUNT_RECORD_TYPE_A);
157
158 /* If the number of known vips is less than our ideal VIP count, clamp it. */
159 if (num_known_vips < (size_t)num_vips) {
160 num_vips = (uint32_t)num_known_vips;
161 }
162 }
163
164 /* We always want to allow for at least one VIP worth of connections. */
165 if (num_vips == 0) {
166 num_vips = 1;
167 }
168
169 uint32_t max_active_connections = num_vips * num_connections_per_vip;
170
171 if (client->max_active_connections_override > 0 &&
172 client->max_active_connections_override < max_active_connections) {
173 max_active_connections = client->max_active_connections_override;
174 }
175
176 return max_active_connections;
177 }
178
179 /* Returns the max number of requests allowed to be in memory */
aws_s3_client_get_max_requests_in_flight(struct aws_s3_client * client)180 uint32_t aws_s3_client_get_max_requests_in_flight(struct aws_s3_client *client) {
181 AWS_PRECONDITION(client);
182 return aws_s3_client_get_max_active_connections(client, NULL) * s_max_requests_multiplier;
183 }
184
185 /* Returns the max number of requests that should be in preparation stage (ie: reading from a stream, being signed,
186 * etc.) */
aws_s3_client_get_max_requests_prepare(struct aws_s3_client * client)187 uint32_t aws_s3_client_get_max_requests_prepare(struct aws_s3_client *client) {
188 return aws_s3_client_get_max_active_connections(client, NULL);
189 }
190
s_s3_client_get_num_requests_network_io(struct aws_s3_client * client,enum aws_s3_meta_request_type meta_request_type)191 static uint32_t s_s3_client_get_num_requests_network_io(
192 struct aws_s3_client *client,
193 enum aws_s3_meta_request_type meta_request_type) {
194 AWS_PRECONDITION(client);
195
196 uint32_t num_requests_network_io = 0;
197
198 if (meta_request_type == AWS_S3_META_REQUEST_TYPE_MAX) {
199 for (uint32_t i = 0; i < AWS_S3_META_REQUEST_TYPE_MAX; ++i) {
200 num_requests_network_io += (uint32_t)aws_atomic_load_int(&client->stats.num_requests_network_io[i]);
201 }
202 } else {
203 num_requests_network_io =
204 (uint32_t)aws_atomic_load_int(&client->stats.num_requests_network_io[meta_request_type]);
205 }
206
207 return num_requests_network_io;
208 }
209
aws_s3_client_lock_synced_data(struct aws_s3_client * client)210 void aws_s3_client_lock_synced_data(struct aws_s3_client *client) {
211 aws_mutex_lock(&client->synced_data.lock);
212 }
213
aws_s3_client_unlock_synced_data(struct aws_s3_client * client)214 void aws_s3_client_unlock_synced_data(struct aws_s3_client *client) {
215 aws_mutex_unlock(&client->synced_data.lock);
216 }
217
aws_s3_client_new(struct aws_allocator * allocator,const struct aws_s3_client_config * client_config)218 struct aws_s3_client *aws_s3_client_new(
219 struct aws_allocator *allocator,
220 const struct aws_s3_client_config *client_config) {
221
222 AWS_PRECONDITION(allocator);
223 AWS_PRECONDITION(client_config);
224
225 if (client_config->client_bootstrap == NULL) {
226 AWS_LOGF_ERROR(
227 AWS_LS_S3_CLIENT,
228 "Cannot create client from client_config; client_bootstrap provided in options is invalid.");
229 aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
230 return NULL;
231 }
232
233 /* Cannot be less than zero. If zero, use default. */
234 if (client_config->throughput_target_gbps < 0.0) {
235 AWS_LOGF_ERROR(
236 AWS_LS_S3_CLIENT,
237 "Cannot create client from client_config; throughput_target_gbps cannot less than or equal to 0.");
238 aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
239 return NULL;
240 }
241
242 #ifdef BYO_CRYPTO
243 if (client_config->tls_mode == AWS_MR_TLS_ENABLED && client_config->tls_connection_options == NULL) {
244 AWS_LOGF_ERROR(
245 AWS_LS_S3_CLIENT,
246 "Cannot create client from client_config; when using BYO_CRYPTO, tls_connection_options can not be "
247 "NULL when TLS is enabled.");
248 aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
249 return NULL;
250 }
251 #endif
252
253 struct aws_s3_client *client = aws_mem_calloc(allocator, 1, sizeof(struct aws_s3_client));
254
255 client->allocator = allocator;
256 client->vtable = &s_s3_client_default_vtable;
257
258 aws_ref_count_init(&client->ref_count, client, (aws_simple_completion_callback *)s_s3_client_start_destroy);
259
260 if (aws_mutex_init(&client->synced_data.lock) != AWS_OP_SUCCESS) {
261 goto lock_init_fail;
262 }
263
264 aws_linked_list_init(&client->synced_data.pending_meta_request_work);
265 aws_linked_list_init(&client->synced_data.prepared_requests);
266
267 aws_linked_list_init(&client->threaded_data.meta_requests);
268 aws_linked_list_init(&client->threaded_data.request_queue);
269
270 aws_atomic_init_int(&client->stats.num_requests_in_flight, 0);
271
272 for (uint32_t i = 0; i < (uint32_t)AWS_S3_META_REQUEST_TYPE_MAX; ++i) {
273 aws_atomic_init_int(&client->stats.num_requests_network_io[i], 0);
274 }
275
276 aws_atomic_init_int(&client->stats.num_requests_stream_queued_waiting, 0);
277 aws_atomic_init_int(&client->stats.num_requests_streaming, 0);
278
279 *((uint32_t *)&client->max_active_connections_override) = client_config->max_active_connections_override;
280
281 /* Store our client bootstrap. */
282 client->client_bootstrap = aws_client_bootstrap_acquire(client_config->client_bootstrap);
283
284 struct aws_event_loop_group *event_loop_group = client_config->client_bootstrap->event_loop_group;
285 aws_event_loop_group_acquire(event_loop_group);
286
287 client->process_work_event_loop = aws_event_loop_group_get_next_loop(event_loop_group);
288
289 /* Set up body streaming ELG */
290 {
291 uint16_t num_event_loops =
292 (uint16_t)aws_array_list_length(&client->client_bootstrap->event_loop_group->event_loops);
293 uint16_t num_streaming_threads = num_event_loops;
294
295 if (num_streaming_threads < 1) {
296 num_streaming_threads = 1;
297 }
298
299 struct aws_shutdown_callback_options body_streaming_elg_shutdown_options = {
300 .shutdown_callback_fn = s_s3_client_body_streaming_elg_shutdown,
301 .shutdown_callback_user_data = client,
302 };
303
304 if (aws_get_cpu_group_count() > 1) {
305 client->body_streaming_elg = aws_event_loop_group_new_default_pinned_to_cpu_group(
306 client->allocator, num_streaming_threads, 1, &body_streaming_elg_shutdown_options);
307 } else {
308 client->body_streaming_elg = aws_event_loop_group_new_default(
309 client->allocator, num_streaming_threads, &body_streaming_elg_shutdown_options);
310 }
311 if (!client->body_streaming_elg) {
312 /* Fail to create elg, we should fail the call */
313 goto elg_create_fail;
314 }
315 client->synced_data.body_streaming_elg_allocated = true;
316 }
317
318 /* Make a copy of the region string. */
319 client->region = aws_string_new_from_array(allocator, client_config->region.ptr, client_config->region.len);
320
321 if (client_config->part_size != 0) {
322 *((size_t *)&client->part_size) = client_config->part_size;
323 } else {
324 *((size_t *)&client->part_size) = s_default_part_size;
325 }
326
327 if (client_config->max_part_size != 0) {
328 *((size_t *)&client->max_part_size) = client_config->max_part_size;
329 } else {
330 *((size_t *)&client->max_part_size) = s_default_max_part_size;
331 }
332
333 if (client_config->max_part_size < client_config->part_size) {
334 *((size_t *)&client_config->max_part_size) = client_config->part_size;
335 }
336
337 if (client_config->tls_mode == AWS_MR_TLS_ENABLED) {
338 client->tls_connection_options =
339 aws_mem_calloc(client->allocator, 1, sizeof(struct aws_tls_connection_options));
340
341 if (client_config->tls_connection_options != NULL) {
342 aws_tls_connection_options_copy(client->tls_connection_options, client_config->tls_connection_options);
343 } else {
344 #ifdef BYO_CRYPTO
345 AWS_FATAL_ASSERT(false);
346 goto on_error;
347 #else
348 struct aws_tls_ctx_options default_tls_ctx_options;
349 AWS_ZERO_STRUCT(default_tls_ctx_options);
350
351 aws_tls_ctx_options_init_default_client(&default_tls_ctx_options, allocator);
352
353 struct aws_tls_ctx *default_tls_ctx = aws_tls_client_ctx_new(allocator, &default_tls_ctx_options);
354 if (default_tls_ctx == NULL) {
355 goto on_error;
356 }
357
358 aws_tls_connection_options_init_from_ctx(client->tls_connection_options, default_tls_ctx);
359
360 aws_tls_ctx_release(default_tls_ctx);
361 aws_tls_ctx_options_clean_up(&default_tls_ctx_options);
362 #endif
363 }
364 }
365
366 if (client_config->throughput_target_gbps != 0.0) {
367 *((double *)&client->throughput_target_gbps) = client_config->throughput_target_gbps;
368 } else {
369 *((double *)&client->throughput_target_gbps) = s_default_throughput_target_gbps;
370 }
371
372 *((enum aws_s3_meta_request_compute_content_md5 *)&client->compute_content_md5) =
373 client_config->compute_content_md5;
374
375 /* Determine how many vips are ideal by dividing target-throughput by throughput-per-vip. */
376 {
377 double ideal_vip_count_double = client->throughput_target_gbps / s_throughput_per_vip_gbps;
378 *((uint32_t *)&client->ideal_vip_count) = (uint32_t)ceil(ideal_vip_count_double);
379 }
380
381 if (client_config->signing_config) {
382 client->cached_signing_config = aws_cached_signing_config_new(client->allocator, client_config->signing_config);
383 }
384
385 client->synced_data.active = true;
386
387 if (client_config->retry_strategy != NULL) {
388 aws_retry_strategy_acquire(client_config->retry_strategy);
389 client->retry_strategy = client_config->retry_strategy;
390 } else {
391 struct aws_exponential_backoff_retry_options backoff_retry_options = {
392 .el_group = client_config->client_bootstrap->event_loop_group,
393 .max_retries = s_default_max_retries,
394 };
395
396 struct aws_standard_retry_options retry_options = {
397 .backoff_retry_options = backoff_retry_options,
398 };
399
400 client->retry_strategy = aws_retry_strategy_new_standard(allocator, &retry_options);
401 }
402
403 aws_hash_table_init(
404 &client->synced_data.endpoints,
405 client->allocator,
406 10,
407 aws_hash_string,
408 aws_hash_callback_string_eq,
409 aws_hash_callback_string_destroy,
410 NULL);
411
412 /* Initialize shutdown options and tracking. */
413 client->shutdown_callback = client_config->shutdown_callback;
414 client->shutdown_callback_user_data = client_config->shutdown_callback_user_data;
415
416 return client;
417
418 on_error:
419 aws_event_loop_group_release(client->body_streaming_elg);
420 client->body_streaming_elg = NULL;
421 if (client->tls_connection_options) {
422 aws_tls_connection_options_clean_up(client->tls_connection_options);
423 aws_mem_release(client->allocator, client->tls_connection_options);
424 client->tls_connection_options = NULL;
425 }
426 elg_create_fail:
427 aws_event_loop_group_release(client->client_bootstrap->event_loop_group);
428 aws_client_bootstrap_release(client->client_bootstrap);
429 aws_mutex_clean_up(&client->synced_data.lock);
430 lock_init_fail:
431 aws_mem_release(client->allocator, client);
432 return NULL;
433 }
434
aws_s3_client_acquire(struct aws_s3_client * client)435 void aws_s3_client_acquire(struct aws_s3_client *client) {
436 AWS_PRECONDITION(client);
437
438 aws_ref_count_acquire(&client->ref_count);
439 }
440
aws_s3_client_release(struct aws_s3_client * client)441 void aws_s3_client_release(struct aws_s3_client *client) {
442 if (client == NULL) {
443 return;
444 }
445
446 aws_ref_count_release(&client->ref_count);
447 }
448
s_s3_client_start_destroy(void * user_data)449 static void s_s3_client_start_destroy(void *user_data) {
450 struct aws_s3_client *client = user_data;
451 AWS_PRECONDITION(client);
452
453 AWS_LOGF_DEBUG(AWS_LS_S3_CLIENT, "id=%p Client starting destruction.", (void *)client);
454
455 struct aws_linked_list local_vip_list;
456 aws_linked_list_init(&local_vip_list);
457
458 aws_s3_client_lock_synced_data(client);
459
460 client->synced_data.active = false;
461
462 /* Prevent the client from cleaning up inbetween the mutex unlock/re-lock below.*/
463 client->synced_data.start_destroy_executing = true;
464
465 aws_s3_client_unlock_synced_data(client);
466
467 aws_event_loop_group_release(client->body_streaming_elg);
468 client->body_streaming_elg = NULL;
469
470 aws_s3_client_lock_synced_data(client);
471 client->synced_data.start_destroy_executing = false;
472
473 /* Schedule the work task to clean up outstanding connections and to call s_s3_client_finish_destroy function if
474 * everything cleaning up asynchronously has finished. */
475 s_s3_client_schedule_process_work_synced(client);
476 aws_s3_client_unlock_synced_data(client);
477 }
478
s_s3_client_finish_destroy_default(struct aws_s3_client * client)479 static void s_s3_client_finish_destroy_default(struct aws_s3_client *client) {
480 AWS_PRECONDITION(client);
481
482 AWS_LOGF_DEBUG(AWS_LS_S3_CLIENT, "id=%p Client finishing destruction.", (void *)client);
483
484 aws_string_destroy(client->region);
485 client->region = NULL;
486
487 if (client->tls_connection_options) {
488 aws_tls_connection_options_clean_up(client->tls_connection_options);
489 aws_mem_release(client->allocator, client->tls_connection_options);
490 client->tls_connection_options = NULL;
491 }
492
493 aws_mutex_clean_up(&client->synced_data.lock);
494
495 AWS_ASSERT(aws_linked_list_empty(&client->synced_data.pending_meta_request_work));
496 AWS_ASSERT(aws_linked_list_empty(&client->threaded_data.meta_requests));
497 aws_hash_table_clean_up(&client->synced_data.endpoints);
498
499 aws_retry_strategy_release(client->retry_strategy);
500
501 aws_event_loop_group_release(client->client_bootstrap->event_loop_group);
502
503 aws_client_bootstrap_release(client->client_bootstrap);
504 aws_cached_signing_config_destroy(client->cached_signing_config);
505
506 aws_s3_client_shutdown_complete_callback_fn *shutdown_callback = client->shutdown_callback;
507 void *shutdown_user_data = client->shutdown_callback_user_data;
508
509 aws_mem_release(client->allocator, client);
510 client = NULL;
511
512 if (shutdown_callback != NULL) {
513 shutdown_callback(shutdown_user_data);
514 }
515 }
516
s_s3_client_body_streaming_elg_shutdown(void * user_data)517 static void s_s3_client_body_streaming_elg_shutdown(void *user_data) {
518 struct aws_s3_client *client = user_data;
519 AWS_PRECONDITION(client);
520
521 AWS_LOGF_DEBUG(AWS_LS_S3_CLIENT, "id=%p Client body streaming ELG shutdown.", (void *)client);
522
523 aws_s3_client_lock_synced_data(client);
524 client->synced_data.body_streaming_elg_allocated = false;
525 s_s3_client_schedule_process_work_synced(client);
526 aws_s3_client_unlock_synced_data(client);
527 }
528
aws_s3_client_queue_requests_threaded(struct aws_s3_client * client,struct aws_linked_list * request_list,bool queue_front)529 uint32_t aws_s3_client_queue_requests_threaded(
530 struct aws_s3_client *client,
531 struct aws_linked_list *request_list,
532 bool queue_front) {
533 AWS_PRECONDITION(client);
534 AWS_PRECONDITION(request_list);
535
536 uint32_t request_list_size = 0;
537
538 for (struct aws_linked_list_node *node = aws_linked_list_begin(request_list);
539 node != aws_linked_list_end(request_list);
540 node = aws_linked_list_next(node)) {
541 ++request_list_size;
542 }
543
544 if (queue_front) {
545 aws_linked_list_move_all_front(&client->threaded_data.request_queue, request_list);
546 } else {
547 aws_linked_list_move_all_back(&client->threaded_data.request_queue, request_list);
548 }
549
550 client->threaded_data.request_queue_size += request_list_size;
551 return request_list_size;
552 }
553
aws_s3_client_dequeue_request_threaded(struct aws_s3_client * client)554 struct aws_s3_request *aws_s3_client_dequeue_request_threaded(struct aws_s3_client *client) {
555 AWS_PRECONDITION(client);
556
557 if (aws_linked_list_empty(&client->threaded_data.request_queue)) {
558 return NULL;
559 }
560
561 struct aws_linked_list_node *request_node = aws_linked_list_pop_front(&client->threaded_data.request_queue);
562 struct aws_s3_request *request = AWS_CONTAINER_OF(request_node, struct aws_s3_request, node);
563
564 --client->threaded_data.request_queue_size;
565
566 return request;
567 }
568
569 /* Public facing make-meta-request function. */
aws_s3_client_make_meta_request(struct aws_s3_client * client,const struct aws_s3_meta_request_options * options)570 struct aws_s3_meta_request *aws_s3_client_make_meta_request(
571 struct aws_s3_client *client,
572 const struct aws_s3_meta_request_options *options) {
573
574 AWS_LOGF_INFO(AWS_LS_S3_CLIENT, "id=%p Initiating making of meta request", (void *)client);
575
576 AWS_PRECONDITION(client);
577 AWS_PRECONDITION(client->vtable);
578 AWS_PRECONDITION(client->vtable->meta_request_factory);
579 AWS_PRECONDITION(options);
580
581 if (options->type != AWS_S3_META_REQUEST_TYPE_DEFAULT && options->type != AWS_S3_META_REQUEST_TYPE_GET_OBJECT &&
582 options->type != AWS_S3_META_REQUEST_TYPE_PUT_OBJECT) {
583 AWS_LOGF_ERROR(
584 AWS_LS_S3_CLIENT,
585 "id=%p Cannot create meta s3 request; invalid meta request type specified.",
586 (void *)client);
587 aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
588 return NULL;
589 }
590
591 if (options->message == NULL) {
592 AWS_LOGF_ERROR(
593 AWS_LS_S3_CLIENT,
594 "id=%p Cannot create meta s3 request; message provided in options is invalid.",
595 (void *)client);
596 aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
597 return NULL;
598 }
599
600 struct aws_http_headers *message_headers = aws_http_message_get_headers(options->message);
601
602 if (message_headers == NULL) {
603 AWS_LOGF_ERROR(
604 AWS_LS_S3_CLIENT,
605 "id=%p Cannot create meta s3 request; message provided in options does not contain headers.",
606 (void *)client);
607 aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
608 return NULL;
609 }
610
611 struct aws_byte_cursor host_header_value;
612
613 if (aws_http_headers_get(message_headers, g_host_header_name, &host_header_value)) {
614 AWS_LOGF_ERROR(
615 AWS_LS_S3_CLIENT,
616 "id=%p Cannot create meta s3 request; message provided in options does not have a 'Host' header.",
617 (void *)client);
618 aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
619 return NULL;
620 }
621
622 struct aws_s3_meta_request *meta_request = client->vtable->meta_request_factory(client, options);
623
624 if (meta_request == NULL) {
625 AWS_LOGF_ERROR(AWS_LS_S3_CLIENT, "id=%p: Could not create new meta request.", (void *)client);
626 return NULL;
627 }
628
629 bool error_occurred = false;
630
631 aws_s3_client_lock_synced_data(client);
632
633 struct aws_string *endpoint_host_name = aws_string_new_from_cursor(client->allocator, &host_header_value);
634
635 struct aws_s3_endpoint *endpoint = NULL;
636 struct aws_hash_element *endpoint_hash_element = NULL;
637
638 int was_created = 0;
639
640 if (aws_hash_table_create(
641 &client->synced_data.endpoints, endpoint_host_name, &endpoint_hash_element, &was_created)) {
642 error_occurred = true;
643 goto unlock;
644 }
645
646 if (was_created) {
647 struct aws_s3_endpoint_options endpoint_options = {
648 .host_name = endpoint_host_name,
649 .ref_count_zero_callback = client->vtable->endpoint_ref_count_zero,
650 .shutdown_callback = client->vtable->endpoint_shutdown_callback,
651 .client_bootstrap = client->client_bootstrap,
652 .tls_connection_options = client->tls_connection_options,
653 .dns_host_address_ttl_seconds = s_dns_host_address_ttl_seconds,
654 .user_data = client,
655 .max_connections = aws_s3_client_get_max_active_connections(client, NULL),
656 };
657
658 endpoint = aws_s3_endpoint_new(client->allocator, &endpoint_options);
659
660 if (endpoint == NULL) {
661 aws_hash_table_remove(&client->synced_data.endpoints, endpoint_host_name, NULL, NULL);
662 error_occurred = true;
663 goto unlock;
664 }
665
666 endpoint_hash_element->value = endpoint;
667 ++client->synced_data.num_endpoints_allocated;
668 } else {
669 endpoint = aws_s3_endpoint_acquire(endpoint_hash_element->value);
670
671 aws_string_destroy(endpoint_host_name);
672 endpoint_host_name = NULL;
673 }
674
675 meta_request->endpoint = endpoint;
676
677 s_s3_client_push_meta_request_synced(client, meta_request);
678 s_s3_client_schedule_process_work_synced(client);
679
680 unlock:
681 aws_s3_client_unlock_synced_data(client);
682
683 if (error_occurred) {
684 AWS_LOGF_ERROR(
685 AWS_LS_S3_CLIENT,
686 "id=%p Could not create meta request due to error %d (%s)",
687 (void *)client,
688 aws_last_error(),
689 aws_error_str(aws_last_error()));
690
691 aws_s3_meta_request_release(meta_request);
692 meta_request = NULL;
693 } else {
694 AWS_LOGF_INFO(AWS_LS_S3_CLIENT, "id=%p: Created meta request %p", (void *)client, (void *)meta_request);
695 }
696
697 return meta_request;
698 }
699
s_s3_client_endpoint_ref_count_zero(struct aws_s3_endpoint * endpoint)700 static bool s_s3_client_endpoint_ref_count_zero(struct aws_s3_endpoint *endpoint) {
701 AWS_PRECONDITION(endpoint);
702
703 struct aws_s3_client *client = endpoint->user_data;
704 AWS_PRECONDITION(client);
705
706 bool clean_up_endpoint = false;
707
708 aws_s3_client_lock_synced_data(client);
709
710 /* It is possible that before we were able to acquire the lock here, the critical section used for looking up the
711 * endpoint in the table and assigning it to a new meta request was called in a different thread. To handle this
712 * case, we check the ref count before removing it.*/
713 if (aws_atomic_load_int(&endpoint->ref_count.ref_count) == 0) {
714 aws_hash_table_remove(&client->synced_data.endpoints, endpoint->host_name, NULL, NULL);
715 clean_up_endpoint = true;
716 }
717
718 aws_s3_client_unlock_synced_data(client);
719
720 return clean_up_endpoint;
721 }
722
s_s3_client_endpoint_shutdown_callback(void * user_data)723 static void s_s3_client_endpoint_shutdown_callback(void *user_data) {
724 struct aws_s3_client *client = user_data;
725 AWS_PRECONDITION(client);
726
727 aws_s3_client_lock_synced_data(client);
728 --client->synced_data.num_endpoints_allocated;
729 s_s3_client_schedule_process_work_synced(client);
730 aws_s3_client_unlock_synced_data(client);
731 }
732
s_s3_client_meta_request_factory_default(struct aws_s3_client * client,const struct aws_s3_meta_request_options * options)733 static struct aws_s3_meta_request *s_s3_client_meta_request_factory_default(
734 struct aws_s3_client *client,
735 const struct aws_s3_meta_request_options *options) {
736 AWS_PRECONDITION(client);
737 AWS_PRECONDITION(options);
738
739 struct aws_http_headers *initial_message_headers = aws_http_message_get_headers(options->message);
740 AWS_ASSERT(initial_message_headers);
741
742 uint64_t content_length = 0;
743 struct aws_byte_cursor content_length_cursor;
744 bool content_length_header_found = false;
745
746 if (!aws_http_headers_get(initial_message_headers, g_content_length_header_name, &content_length_cursor)) {
747 struct aws_string *content_length_str = aws_string_new_from_cursor(client->allocator, &content_length_cursor);
748 char *content_length_str_end = NULL;
749
750 content_length = strtoull((const char *)content_length_str->bytes, &content_length_str_end, 10);
751 aws_string_destroy(content_length_str);
752
753 content_length_str = NULL;
754 content_length_header_found = true;
755 }
756
757 /* Call the appropriate meta-request new function. */
758 if (options->type == AWS_S3_META_REQUEST_TYPE_GET_OBJECT) {
759
760 /* If the initial request already has partNumber, the request is not splittable(?). Treat it as a Default
761 * request.
762 * TODO: Still need tests to verify that the request of a part is splittable or not */
763 if (aws_http_headers_has(initial_message_headers, aws_byte_cursor_from_c_str("partNumber"))) {
764 return aws_s3_meta_request_default_new(client->allocator, client, content_length, false, options);
765 }
766
767 return aws_s3_meta_request_auto_ranged_get_new(client->allocator, client, client->part_size, options);
768 } else if (options->type == AWS_S3_META_REQUEST_TYPE_PUT_OBJECT) {
769
770 if (!content_length_header_found) {
771 AWS_LOGF_ERROR(
772 AWS_LS_S3_META_REQUEST,
773 "Could not create auto-ranged-put meta request; there is no Content-Length header present.");
774 aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
775 return NULL;
776 }
777
778 struct aws_input_stream *input_stream = aws_http_message_get_body_stream(options->message);
779
780 if (input_stream == NULL) {
781 AWS_LOGF_ERROR(
782 AWS_LS_S3_META_REQUEST, "Could not create auto-ranged-put meta request; body stream is NULL.");
783 aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
784 return NULL;
785 }
786
787 size_t client_part_size = client->part_size;
788 size_t client_max_part_size = client->max_part_size;
789
790 if (client_part_size < g_s3_min_upload_part_size) {
791 AWS_LOGF_WARN(
792 AWS_LS_S3_META_REQUEST,
793 "Client config part size of %" PRIu64 " is less than the minimum upload part size of %" PRIu64
794 ". Using to the minimum part-size for upload.",
795 (uint64_t)client_part_size,
796 (uint64_t)g_s3_min_upload_part_size);
797
798 client_part_size = g_s3_min_upload_part_size;
799 }
800
801 if (client_max_part_size < g_s3_min_upload_part_size) {
802 AWS_LOGF_WARN(
803 AWS_LS_S3_META_REQUEST,
804 "Client config max part size of %" PRIu64 " is less than the minimum upload part size of %" PRIu64
805 ". Clamping to the minimum part-size for upload.",
806 (uint64_t)client_max_part_size,
807 (uint64_t)g_s3_min_upload_part_size);
808
809 client_max_part_size = g_s3_min_upload_part_size;
810 }
811
812 if (content_length < client_part_size) {
813 return aws_s3_meta_request_default_new(
814 client->allocator,
815 client,
816 content_length,
817 client->compute_content_md5 == AWS_MR_CONTENT_MD5_ENABLED &&
818 !aws_http_headers_has(initial_message_headers, g_content_md5_header_name),
819 options);
820 }
821
822 uint64_t part_size_uint64 = content_length / (uint64_t)g_s3_max_num_upload_parts;
823
824 if (part_size_uint64 > SIZE_MAX) {
825 AWS_LOGF_ERROR(
826 AWS_LS_S3_META_REQUEST,
827 "Could not create auto-ranged-put meta request; required part size of %" PRIu64
828 " bytes is too large for platform.",
829 part_size_uint64);
830
831 aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
832 return NULL;
833 }
834
835 size_t part_size = (size_t)part_size_uint64;
836
837 if (part_size > client_max_part_size) {
838 AWS_LOGF_ERROR(
839 AWS_LS_S3_META_REQUEST,
840 "Could not create auto-ranged-put meta request; required part size for put request is %" PRIu64
841 ", but current maximum part size is %" PRIu64,
842 (uint64_t)part_size,
843 (uint64_t)client_max_part_size);
844 aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
845 return NULL;
846 }
847
848 if (part_size < client_part_size) {
849 part_size = client_part_size;
850 }
851
852 uint32_t num_parts = (uint32_t)(content_length / part_size);
853
854 if ((content_length % part_size) > 0) {
855 ++num_parts;
856 }
857
858 return aws_s3_meta_request_auto_ranged_put_new(
859 client->allocator, client, part_size, content_length, num_parts, options);
860 } else if (options->type == AWS_S3_META_REQUEST_TYPE_DEFAULT) {
861 return aws_s3_meta_request_default_new(client->allocator, client, content_length, false, options);
862 } else {
863 AWS_FATAL_ASSERT(false);
864 }
865
866 return NULL;
867 }
868
s_s3_client_push_meta_request_synced(struct aws_s3_client * client,struct aws_s3_meta_request * meta_request)869 static void s_s3_client_push_meta_request_synced(
870 struct aws_s3_client *client,
871 struct aws_s3_meta_request *meta_request) {
872 AWS_PRECONDITION(client);
873 AWS_PRECONDITION(meta_request);
874 ASSERT_SYNCED_DATA_LOCK_HELD(client);
875
876 struct aws_s3_meta_request_work *meta_request_work =
877 aws_mem_calloc(client->allocator, 1, sizeof(struct aws_s3_meta_request_work));
878
879 aws_s3_meta_request_acquire(meta_request);
880 meta_request_work->meta_request = meta_request;
881 aws_linked_list_push_back(&client->synced_data.pending_meta_request_work, &meta_request_work->node);
882 }
883
s_s3_client_schedule_process_work_synced(struct aws_s3_client * client)884 static void s_s3_client_schedule_process_work_synced(struct aws_s3_client *client) {
885 AWS_PRECONDITION(client);
886 AWS_PRECONDITION(client->vtable);
887 AWS_PRECONDITION(client->vtable->schedule_process_work_synced);
888
889 ASSERT_SYNCED_DATA_LOCK_HELD(client);
890
891 client->vtable->schedule_process_work_synced(client);
892 }
893
s_s3_client_schedule_process_work_synced_default(struct aws_s3_client * client)894 static void s_s3_client_schedule_process_work_synced_default(struct aws_s3_client *client) {
895 ASSERT_SYNCED_DATA_LOCK_HELD(client);
896
897 if (client->synced_data.process_work_task_scheduled) {
898 return;
899 }
900
901 aws_task_init(
902 &client->synced_data.process_work_task, s_s3_client_process_work_task, client, "s3_client_process_work_task");
903
904 aws_event_loop_schedule_task_now(client->process_work_event_loop, &client->synced_data.process_work_task);
905
906 client->synced_data.process_work_task_scheduled = true;
907 }
908
aws_s3_client_schedule_process_work(struct aws_s3_client * client)909 void aws_s3_client_schedule_process_work(struct aws_s3_client *client) {
910 AWS_PRECONDITION(client);
911
912 aws_s3_client_lock_synced_data(client);
913 s_s3_client_schedule_process_work_synced(client);
914 aws_s3_client_unlock_synced_data(client);
915 }
916
s_s3_client_remove_meta_request_threaded(struct aws_s3_client * client,struct aws_s3_meta_request * meta_request)917 static void s_s3_client_remove_meta_request_threaded(
918 struct aws_s3_client *client,
919 struct aws_s3_meta_request *meta_request) {
920 AWS_PRECONDITION(client);
921 AWS_PRECONDITION(meta_request);
922 (void)client;
923
924 aws_linked_list_remove(&meta_request->client_process_work_threaded_data.node);
925 meta_request->client_process_work_threaded_data.scheduled = false;
926 aws_s3_meta_request_release(meta_request);
927 }
928
929 /* Task function for trying to find a request that can be processed. */
s_s3_client_process_work_task(struct aws_task * task,void * arg,enum aws_task_status task_status)930 static void s_s3_client_process_work_task(struct aws_task *task, void *arg, enum aws_task_status task_status) {
931 AWS_PRECONDITION(task);
932 (void)task;
933 (void)task_status;
934
935 /* Client keeps a reference to the event loop group; a 'canceled' status should not happen.*/
936 AWS_ASSERT(task_status == AWS_TASK_STATUS_RUN_READY);
937
938 struct aws_s3_client *client = arg;
939 AWS_PRECONDITION(client);
940 AWS_PRECONDITION(client->vtable);
941 AWS_PRECONDITION(client->vtable->process_work);
942
943 client->vtable->process_work(client);
944 }
945
s_s3_client_process_work_default(struct aws_s3_client * client)946 static void s_s3_client_process_work_default(struct aws_s3_client *client) {
947 AWS_PRECONDITION(client);
948 AWS_PRECONDITION(client->vtable);
949 AWS_PRECONDITION(client->vtable->finish_destroy);
950
951 struct aws_linked_list meta_request_work_list;
952 aws_linked_list_init(&meta_request_work_list);
953
954 /*******************/
955 /* Step 1: Move relevant data into thread local memory. */
956 /*******************/
957 AWS_LOGF_DEBUG(
958 AWS_LS_S3_CLIENT,
959 "id=%p s_s3_client_process_work_default - Moving relevant synced_data into threaded_data.",
960 (void *)client);
961 aws_s3_client_lock_synced_data(client);
962
963 /* Once we exit this mutex, someone can reschedule this task. */
964 client->synced_data.process_work_task_scheduled = false;
965 client->synced_data.process_work_task_in_progress = true;
966
967 aws_linked_list_swap_contents(&meta_request_work_list, &client->synced_data.pending_meta_request_work);
968
969 uint32_t num_requests_queued =
970 aws_s3_client_queue_requests_threaded(client, &client->synced_data.prepared_requests, false);
971
972 {
973 int sub_result = aws_sub_u32_checked(
974 client->threaded_data.num_requests_being_prepared,
975 num_requests_queued,
976 &client->threaded_data.num_requests_being_prepared);
977
978 AWS_ASSERT(sub_result == AWS_OP_SUCCESS);
979 (void)sub_result;
980 }
981
982 {
983 int sub_result = aws_sub_u32_checked(
984 client->threaded_data.num_requests_being_prepared,
985 client->synced_data.num_failed_prepare_requests,
986 &client->threaded_data.num_requests_being_prepared);
987
988 client->synced_data.num_failed_prepare_requests = 0;
989
990 AWS_ASSERT(sub_result == AWS_OP_SUCCESS);
991 (void)sub_result;
992 }
993
994 uint32_t num_endpoints_in_table = (uint32_t)aws_hash_table_get_entry_count(&client->synced_data.endpoints);
995 uint32_t num_endpoints_allocated = client->synced_data.num_endpoints_allocated;
996
997 aws_s3_client_unlock_synced_data(client);
998
999 /*******************/
1000 /* Step 2: Push meta requests into the thread local list if they haven't already been scheduled. */
1001 /*******************/
1002 AWS_LOGF_DEBUG(
1003 AWS_LS_S3_CLIENT, "id=%p s_s3_client_process_work_default - Processing any new meta requests.", (void *)client);
1004
1005 while (!aws_linked_list_empty(&meta_request_work_list)) {
1006 struct aws_linked_list_node *node = aws_linked_list_pop_back(&meta_request_work_list);
1007 struct aws_s3_meta_request_work *meta_request_work =
1008 AWS_CONTAINER_OF(node, struct aws_s3_meta_request_work, node);
1009
1010 AWS_FATAL_ASSERT(meta_request_work != NULL);
1011 AWS_FATAL_ASSERT(meta_request_work->meta_request != NULL);
1012
1013 struct aws_s3_meta_request *meta_request = meta_request_work->meta_request;
1014
1015 if (!meta_request->client_process_work_threaded_data.scheduled) {
1016 aws_linked_list_push_back(
1017 &client->threaded_data.meta_requests, &meta_request->client_process_work_threaded_data.node);
1018
1019 meta_request->client_process_work_threaded_data.scheduled = true;
1020 } else {
1021 aws_s3_meta_request_release(meta_request);
1022 meta_request = NULL;
1023 }
1024
1025 aws_mem_release(client->allocator, meta_request_work);
1026 }
1027
1028 /*******************/
1029 /* Step 3: Update relevant meta requests and connections. */
1030 /*******************/
1031 {
1032 AWS_LOGF_DEBUG(AWS_LS_S3_CLIENT, "id=%p Updating meta requests.", (void *)client);
1033 aws_s3_client_update_meta_requests_threaded(client);
1034
1035 AWS_LOGF_DEBUG(
1036 AWS_LS_S3_CLIENT, "id=%p Updating connections, assigning requests where possible.", (void *)client);
1037 aws_s3_client_update_connections_threaded(client);
1038 }
1039
1040 /*******************/
1041 /* Step 4: Log client stats. */
1042 /*******************/
1043 {
1044 uint32_t num_requests_tracked_requests = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_in_flight);
1045
1046 uint32_t num_auto_ranged_get_network_io =
1047 s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_GET_OBJECT);
1048 uint32_t num_auto_ranged_put_network_io =
1049 s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_PUT_OBJECT);
1050 uint32_t num_auto_default_network_io =
1051 s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_DEFAULT);
1052
1053 uint32_t num_requests_network_io =
1054 s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_MAX);
1055
1056 uint32_t num_requests_stream_queued_waiting =
1057 (uint32_t)aws_atomic_load_int(&client->stats.num_requests_stream_queued_waiting);
1058 uint32_t num_requests_streaming = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_streaming);
1059
1060 uint32_t total_approx_requests = num_requests_network_io + num_requests_stream_queued_waiting +
1061 num_requests_streaming + client->threaded_data.num_requests_being_prepared +
1062 client->threaded_data.request_queue_size;
1063 AWS_LOGF(
1064 s_log_level_client_stats,
1065 AWS_LS_S3_CLIENT_STATS,
1066 "id=%p Requests-in-flight(approx/exact):%d/%d Requests-preparing:%d Requests-queued:%d "
1067 "Requests-network(get/put/default/total):%d/%d/%d/%d Requests-streaming-waiting:%d Requests-streaming:%d "
1068 " Endpoints(in-table/allocated):%d/%d",
1069 (void *)client,
1070 total_approx_requests,
1071 num_requests_tracked_requests,
1072 client->threaded_data.num_requests_being_prepared,
1073 client->threaded_data.request_queue_size,
1074 num_auto_ranged_get_network_io,
1075 num_auto_ranged_put_network_io,
1076 num_auto_default_network_io,
1077 num_requests_network_io,
1078 num_requests_stream_queued_waiting,
1079 num_requests_streaming,
1080 num_endpoints_in_table,
1081 num_endpoints_allocated);
1082 }
1083
1084 /*******************/
1085 /* Step 5: Check for client shutdown. */
1086 /*******************/
1087 {
1088 aws_s3_client_lock_synced_data(client);
1089 client->synced_data.process_work_task_in_progress = false;
1090
1091 /* This flag should never be set twice. If it was, that means a double-free could occur.*/
1092 AWS_ASSERT(!client->synced_data.finish_destroy);
1093
1094 bool finish_destroy = client->synced_data.active == false &&
1095 client->synced_data.start_destroy_executing == false &&
1096 client->synced_data.body_streaming_elg_allocated == false &&
1097 client->synced_data.process_work_task_scheduled == false &&
1098 client->synced_data.process_work_task_in_progress == false &&
1099 client->synced_data.num_endpoints_allocated == 0;
1100
1101 client->synced_data.finish_destroy = finish_destroy;
1102
1103 if (!client->synced_data.active) {
1104 AWS_LOGF_DEBUG(
1105 AWS_LS_S3_CLIENT,
1106 "id=%p Client shutdown progress: starting_destroy_executing=%d body_streaming_elg_allocated=%d "
1107 "process_work_task_scheduled=%d process_work_task_in_progress=%d num_endpoints_allocated=%d "
1108 "finish_destroy=%d",
1109 (void *)client,
1110 (int)client->synced_data.start_destroy_executing,
1111 (int)client->synced_data.body_streaming_elg_allocated,
1112 (int)client->synced_data.process_work_task_scheduled,
1113 (int)client->synced_data.process_work_task_in_progress,
1114 (int)client->synced_data.num_endpoints_allocated,
1115 (int)client->synced_data.finish_destroy);
1116 }
1117
1118 aws_s3_client_unlock_synced_data(client);
1119
1120 if (finish_destroy) {
1121 client->vtable->finish_destroy(client);
1122 }
1123 }
1124 }
1125
1126 static void s_s3_client_prepare_callback_queue_request(
1127 struct aws_s3_meta_request *meta_request,
1128 struct aws_s3_request *request,
1129 int error_code,
1130 void *user_data);
1131
aws_s3_client_update_meta_requests_threaded(struct aws_s3_client * client)1132 void aws_s3_client_update_meta_requests_threaded(struct aws_s3_client *client) {
1133 AWS_PRECONDITION(client);
1134
1135 const uint32_t max_requests_in_flight = aws_s3_client_get_max_requests_in_flight(client);
1136 const uint32_t max_requests_prepare = aws_s3_client_get_max_requests_prepare(client);
1137
1138 struct aws_linked_list meta_requests_work_remaining;
1139 aws_linked_list_init(&meta_requests_work_remaining);
1140
1141 uint32_t num_requests_in_flight = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_in_flight);
1142
1143 const uint32_t pass_flags[] = {
1144 AWS_S3_META_REQUEST_UPDATE_FLAG_CONSERVATIVE,
1145 0,
1146 };
1147
1148 const uint32_t num_passes = AWS_ARRAY_SIZE(pass_flags);
1149
1150 for (uint32_t pass_index = 0; pass_index < num_passes; ++pass_index) {
1151
1152 /* While:
1153 * * Number of being-prepared + already-prepared-and-queued requests is less than the max that can be in the
1154 * preparation stage.
1155 * * Total number of requests tracked by the client is less than the max tracked ("in flight") requests.
1156 * * There are meta requests to get requests from.
1157 *
1158 * Then update meta requests to get new requests that can then be prepared (reading from any streams, signing,
1159 * etc.) for sending.
1160 */
1161 while ((client->threaded_data.num_requests_being_prepared + client->threaded_data.request_queue_size) <
1162 max_requests_prepare &&
1163 num_requests_in_flight < max_requests_in_flight &&
1164 !aws_linked_list_empty(&client->threaded_data.meta_requests)) {
1165
1166 struct aws_linked_list_node *meta_request_node =
1167 aws_linked_list_begin(&client->threaded_data.meta_requests);
1168 struct aws_s3_meta_request *meta_request =
1169 AWS_CONTAINER_OF(meta_request_node, struct aws_s3_meta_request, client_process_work_threaded_data);
1170
1171 struct aws_s3_endpoint *endpoint = meta_request->endpoint;
1172 AWS_ASSERT(endpoint != NULL);
1173
1174 AWS_ASSERT(client->vtable->get_host_address_count);
1175 size_t num_known_vips = client->vtable->get_host_address_count(
1176 client->client_bootstrap->host_resolver, endpoint->host_name, AWS_GET_HOST_ADDRESS_COUNT_RECORD_TYPE_A);
1177
1178 /* If this particular endpoint doesn't have any known addresses yet, then we don't want to go full speed in
1179 * ramping up requests just yet. If there is already enough in the queue for one address (even if those
1180 * aren't for this particular endpoint) we skip over this meta request for now. */
1181 if (num_known_vips == 0 && (client->threaded_data.num_requests_being_prepared +
1182 client->threaded_data.request_queue_size) >= g_max_num_connections_per_vip) {
1183 aws_linked_list_remove(&meta_request->client_process_work_threaded_data.node);
1184 aws_linked_list_push_back(
1185 &meta_requests_work_remaining, &meta_request->client_process_work_threaded_data.node);
1186 continue;
1187 }
1188
1189 struct aws_s3_request *request = NULL;
1190
1191 /* Try to grab the next request from the meta request. */
1192 bool work_remaining = aws_s3_meta_request_update(meta_request, pass_flags[pass_index], &request);
1193
1194 if (work_remaining) {
1195 /* If there is work remaining, but we didn't get a request back, take the meta request out of the
1196 * list so that we don't use it again during this function, with the intention of putting it back in
1197 * the list before this function ends. */
1198 if (request == NULL) {
1199 aws_linked_list_remove(&meta_request->client_process_work_threaded_data.node);
1200 aws_linked_list_push_back(
1201 &meta_requests_work_remaining, &meta_request->client_process_work_threaded_data.node);
1202 } else {
1203 request->tracked_by_client = true;
1204
1205 ++client->threaded_data.num_requests_being_prepared;
1206
1207 num_requests_in_flight =
1208 (uint32_t)aws_atomic_fetch_add(&client->stats.num_requests_in_flight, 1) + 1;
1209
1210 aws_s3_meta_request_prepare_request(
1211 meta_request, request, s_s3_client_prepare_callback_queue_request, client);
1212 }
1213 } else {
1214 s_s3_client_remove_meta_request_threaded(client, meta_request);
1215 }
1216 }
1217
1218 aws_linked_list_move_all_front(&client->threaded_data.meta_requests, &meta_requests_work_remaining);
1219 }
1220 }
1221
s_s3_client_prepare_callback_queue_request(struct aws_s3_meta_request * meta_request,struct aws_s3_request * request,int error_code,void * user_data)1222 static void s_s3_client_prepare_callback_queue_request(
1223 struct aws_s3_meta_request *meta_request,
1224 struct aws_s3_request *request,
1225 int error_code,
1226 void *user_data) {
1227 AWS_PRECONDITION(meta_request);
1228 AWS_PRECONDITION(request);
1229
1230 struct aws_s3_client *client = user_data;
1231 AWS_PRECONDITION(client);
1232
1233 if (error_code != AWS_ERROR_SUCCESS) {
1234 aws_s3_meta_request_finished_request(meta_request, request, error_code);
1235
1236 aws_s3_request_release(request);
1237 request = NULL;
1238 }
1239
1240 aws_s3_client_lock_synced_data(client);
1241
1242 if (error_code == AWS_ERROR_SUCCESS) {
1243 aws_linked_list_push_back(&client->synced_data.prepared_requests, &request->node);
1244 } else {
1245 ++client->synced_data.num_failed_prepare_requests;
1246 }
1247
1248 s_s3_client_schedule_process_work_synced(client);
1249 aws_s3_client_unlock_synced_data(client);
1250 }
1251
aws_s3_client_update_connections_threaded(struct aws_s3_client * client)1252 void aws_s3_client_update_connections_threaded(struct aws_s3_client *client) {
1253 AWS_PRECONDITION(client);
1254 AWS_PRECONDITION(client->vtable);
1255
1256 struct aws_linked_list left_over_requests;
1257 aws_linked_list_init(&left_over_requests);
1258
1259 while (s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_MAX) <
1260 aws_s3_client_get_max_active_connections(client, NULL) &&
1261 !aws_linked_list_empty(&client->threaded_data.request_queue)) {
1262
1263 struct aws_s3_request *request = aws_s3_client_dequeue_request_threaded(client);
1264 const uint32_t max_active_connections = aws_s3_client_get_max_active_connections(client, request->meta_request);
1265
1266 /* Unless the request is marked "always send", if this meta request has a finish result, then finish the request
1267 * now and release it. */
1268 if (!request->always_send && aws_s3_meta_request_has_finish_result(request->meta_request)) {
1269 aws_s3_meta_request_finished_request(request->meta_request, request, AWS_ERROR_S3_CANCELED);
1270
1271 aws_s3_request_release(request);
1272 request = NULL;
1273 } else if (
1274 s_s3_client_get_num_requests_network_io(client, request->meta_request->type) < max_active_connections) {
1275 s_s3_client_create_connection_for_request(client, request);
1276 } else {
1277 /* Push the request into the left-over list to be used in a future call of this function. */
1278 aws_linked_list_push_back(&left_over_requests, &request->node);
1279 }
1280 }
1281
1282 aws_s3_client_queue_requests_threaded(client, &left_over_requests, true);
1283 }
1284
1285 static void s_s3_client_acquired_retry_token(
1286 struct aws_retry_strategy *retry_strategy,
1287 int error_code,
1288 struct aws_retry_token *token,
1289 void *user_data);
1290
1291 static void s_s3_client_retry_ready(struct aws_retry_token *token, int error_code, void *user_data);
1292
1293 static void s_s3_client_create_connection_for_request_default(
1294 struct aws_s3_client *client,
1295 struct aws_s3_request *request);
1296
s_s3_client_create_connection_for_request(struct aws_s3_client * client,struct aws_s3_request * request)1297 static void s_s3_client_create_connection_for_request(struct aws_s3_client *client, struct aws_s3_request *request) {
1298 AWS_PRECONDITION(client);
1299 AWS_PRECONDITION(client->vtable);
1300
1301 if (client->vtable->create_connection_for_request) {
1302 client->vtable->create_connection_for_request(client, request);
1303 return;
1304 }
1305
1306 s_s3_client_create_connection_for_request_default(client, request);
1307 }
1308
s_s3_client_create_connection_for_request_default(struct aws_s3_client * client,struct aws_s3_request * request)1309 static void s_s3_client_create_connection_for_request_default(
1310 struct aws_s3_client *client,
1311 struct aws_s3_request *request) {
1312 AWS_PRECONDITION(client);
1313 AWS_PRECONDITION(request);
1314
1315 struct aws_s3_meta_request *meta_request = request->meta_request;
1316 AWS_PRECONDITION(meta_request);
1317
1318 aws_atomic_fetch_add(&client->stats.num_requests_network_io[meta_request->type], 1);
1319
1320 struct aws_s3_connection *connection = aws_mem_calloc(client->allocator, 1, sizeof(struct aws_s3_connection));
1321
1322 connection->endpoint = aws_s3_endpoint_acquire(meta_request->endpoint);
1323 connection->request = request;
1324
1325 struct aws_byte_cursor host_header_value;
1326 AWS_ZERO_STRUCT(host_header_value);
1327
1328 struct aws_http_headers *message_headers = aws_http_message_get_headers(meta_request->initial_request_message);
1329 AWS_ASSERT(message_headers);
1330
1331 int get_header_result = aws_http_headers_get(message_headers, g_host_header_name, &host_header_value);
1332 AWS_ASSERT(get_header_result == AWS_OP_SUCCESS);
1333 (void)get_header_result;
1334
1335 if (aws_retry_strategy_acquire_retry_token(
1336 client->retry_strategy, &host_header_value, s_s3_client_acquired_retry_token, connection, 0)) {
1337
1338 AWS_LOGF_ERROR(
1339 AWS_LS_S3_CLIENT,
1340 "id=%p Client could not acquire retry token for request %p due to error %d (%s)",
1341 (void *)client,
1342 (void *)request,
1343 aws_last_error_or_unknown(),
1344 aws_error_str(aws_last_error_or_unknown()));
1345
1346 goto reset_connection;
1347 }
1348
1349 return;
1350
1351 reset_connection:
1352
1353 aws_s3_client_notify_connection_finished(
1354 client, connection, aws_last_error_or_unknown(), AWS_S3_CONNECTION_FINISH_CODE_FAILED);
1355 }
1356
s_s3_client_acquired_retry_token(struct aws_retry_strategy * retry_strategy,int error_code,struct aws_retry_token * token,void * user_data)1357 static void s_s3_client_acquired_retry_token(
1358 struct aws_retry_strategy *retry_strategy,
1359 int error_code,
1360 struct aws_retry_token *token,
1361 void *user_data) {
1362
1363 AWS_PRECONDITION(retry_strategy);
1364 (void)retry_strategy;
1365
1366 struct aws_s3_connection *connection = user_data;
1367 AWS_PRECONDITION(connection);
1368
1369 struct aws_s3_request *request = connection->request;
1370 AWS_PRECONDITION(request);
1371
1372 struct aws_s3_meta_request *meta_request = request->meta_request;
1373 AWS_PRECONDITION(meta_request);
1374
1375 struct aws_s3_endpoint *endpoint = meta_request->endpoint;
1376 AWS_ASSERT(endpoint != NULL);
1377
1378 struct aws_s3_client *client = endpoint->user_data;
1379 AWS_ASSERT(client != NULL);
1380
1381 if (error_code != AWS_ERROR_SUCCESS) {
1382 AWS_LOGF_ERROR(
1383 AWS_LS_S3_CLIENT,
1384 "id=%p Client could not get retry token for connection %p processing request %p due to error %d (%s)",
1385 (void *)client,
1386 (void *)connection,
1387 (void *)request,
1388 error_code,
1389 aws_error_str(error_code));
1390
1391 goto error_clean_up;
1392 }
1393
1394 AWS_ASSERT(token);
1395
1396 connection->retry_token = token;
1397
1398 AWS_ASSERT(client->vtable->acquire_http_connection);
1399
1400 client->vtable->acquire_http_connection(
1401 endpoint->http_connection_manager, s_s3_client_on_acquire_http_connection, connection);
1402
1403 return;
1404
1405 error_clean_up:
1406
1407 aws_s3_client_notify_connection_finished(client, connection, error_code, AWS_S3_CONNECTION_FINISH_CODE_FAILED);
1408 }
1409
s_s3_client_on_acquire_http_connection(struct aws_http_connection * incoming_http_connection,int error_code,void * user_data)1410 static void s_s3_client_on_acquire_http_connection(
1411 struct aws_http_connection *incoming_http_connection,
1412 int error_code,
1413 void *user_data) {
1414
1415 struct aws_s3_connection *connection = user_data;
1416 AWS_PRECONDITION(connection);
1417
1418 struct aws_s3_request *request = connection->request;
1419 AWS_PRECONDITION(request);
1420
1421 struct aws_s3_meta_request *meta_request = request->meta_request;
1422 AWS_PRECONDITION(meta_request);
1423
1424 struct aws_s3_endpoint *endpoint = meta_request->endpoint;
1425 AWS_ASSERT(endpoint != NULL);
1426
1427 struct aws_s3_client *client = endpoint->user_data;
1428 AWS_ASSERT(client != NULL);
1429
1430 if (error_code != AWS_ERROR_SUCCESS) {
1431 AWS_LOGF_ERROR(
1432 AWS_LS_S3_ENDPOINT,
1433 "id=%p: Could not acquire connection due to error code %d (%s)",
1434 (void *)endpoint,
1435 error_code,
1436 aws_error_str(error_code));
1437
1438 if (error_code == AWS_IO_DNS_INVALID_NAME) {
1439 goto error_fail;
1440 }
1441
1442 goto error_retry;
1443 }
1444
1445 connection->http_connection = incoming_http_connection;
1446 aws_s3_meta_request_send_request(meta_request, connection);
1447 return;
1448
1449 error_retry:
1450
1451 aws_s3_client_notify_connection_finished(client, connection, error_code, AWS_S3_CONNECTION_FINISH_CODE_RETRY);
1452 return;
1453
1454 error_fail:
1455
1456 aws_s3_client_notify_connection_finished(client, connection, error_code, AWS_S3_CONNECTION_FINISH_CODE_FAILED);
1457 }
1458
1459 /* Called by aws_s3_meta_request when it has finished using this connection for a single request. */
aws_s3_client_notify_connection_finished(struct aws_s3_client * client,struct aws_s3_connection * connection,int error_code,enum aws_s3_connection_finish_code finish_code)1460 void aws_s3_client_notify_connection_finished(
1461 struct aws_s3_client *client,
1462 struct aws_s3_connection *connection,
1463 int error_code,
1464 enum aws_s3_connection_finish_code finish_code) {
1465 AWS_PRECONDITION(client);
1466 AWS_PRECONDITION(connection);
1467
1468 struct aws_s3_request *request = connection->request;
1469 AWS_PRECONDITION(request);
1470
1471 struct aws_s3_meta_request *meta_request = request->meta_request;
1472
1473 AWS_PRECONDITION(meta_request);
1474 AWS_PRECONDITION(meta_request->initial_request_message);
1475
1476 struct aws_s3_endpoint *endpoint = meta_request->endpoint;
1477 AWS_PRECONDITION(endpoint);
1478
1479 /* If we're trying to setup a retry... */
1480 if (finish_code == AWS_S3_CONNECTION_FINISH_CODE_RETRY) {
1481
1482 if (connection->retry_token == NULL) {
1483 AWS_LOGF_ERROR(
1484 AWS_LS_S3_CLIENT,
1485 "id=%p Client could not schedule retry of request %p for meta request %p",
1486 (void *)client,
1487 (void *)request,
1488 (void *)meta_request);
1489
1490 goto reset_connection;
1491 }
1492
1493 if (aws_s3_meta_request_is_finished(meta_request)) {
1494 AWS_LOGF_DEBUG(
1495 AWS_LS_S3_CLIENT,
1496 "id=%p Client not scheduling retry of request %p for meta request %p with token %p because meta "
1497 "request has been flagged as finished.",
1498 (void *)client,
1499 (void *)request,
1500 (void *)meta_request,
1501 (void *)connection->retry_token);
1502
1503 goto reset_connection;
1504 }
1505
1506 AWS_LOGF_DEBUG(
1507 AWS_LS_S3_CLIENT,
1508 "id=%p Client scheduling retry of request %p for meta request %p with token %p.",
1509 (void *)client,
1510 (void *)request,
1511 (void *)meta_request,
1512 (void *)connection->retry_token);
1513
1514 enum aws_retry_error_type error_type = AWS_RETRY_ERROR_TYPE_TRANSIENT;
1515
1516 switch (error_code) {
1517 case AWS_ERROR_S3_INTERNAL_ERROR:
1518 error_type = AWS_RETRY_ERROR_TYPE_SERVER_ERROR;
1519 break;
1520
1521 case AWS_ERROR_S3_SLOW_DOWN:
1522 error_type = AWS_RETRY_ERROR_TYPE_THROTTLING;
1523 break;
1524 }
1525
1526 if (connection->http_connection != NULL) {
1527 AWS_ASSERT(endpoint->http_connection_manager);
1528
1529 aws_http_connection_manager_release_connection(
1530 endpoint->http_connection_manager, connection->http_connection);
1531
1532 connection->http_connection = NULL;
1533 }
1534
1535 /* Ask the retry strategy to schedule a retry of the request. */
1536 if (aws_retry_strategy_schedule_retry(
1537 connection->retry_token, error_type, s_s3_client_retry_ready, connection)) {
1538 error_code = aws_last_error_or_unknown();
1539
1540 AWS_LOGF_ERROR(
1541 AWS_LS_S3_CLIENT,
1542 "id=%p Client could not retry request %p for meta request %p with token %p due to error %d (%s)",
1543 (void *)client,
1544 (void *)request,
1545 (void *)meta_request,
1546 (void *)connection->retry_token,
1547 error_code,
1548 aws_error_str(error_code));
1549
1550 goto reset_connection;
1551 }
1552
1553 return;
1554 }
1555
1556 reset_connection:
1557
1558 if (connection->retry_token != NULL) {
1559 /* If we have a retry token and successfully finished, record that success. */
1560 if (finish_code == AWS_S3_CONNECTION_FINISH_CODE_SUCCESS) {
1561 aws_retry_token_record_success(connection->retry_token);
1562 }
1563
1564 aws_retry_token_release(connection->retry_token);
1565 connection->retry_token = NULL;
1566 }
1567
1568 /* If we weren't successful, and we're here, that means this failure is not eligible for a retry. So finish the
1569 * request, and close our HTTP connection. */
1570 if (finish_code != AWS_S3_CONNECTION_FINISH_CODE_SUCCESS) {
1571 if (connection->http_connection != NULL) {
1572 aws_http_connection_close(connection->http_connection);
1573 }
1574 }
1575
1576 aws_atomic_fetch_sub(&client->stats.num_requests_network_io[meta_request->type], 1);
1577
1578 aws_s3_meta_request_finished_request(meta_request, request, error_code);
1579
1580 if (connection->http_connection != NULL) {
1581 AWS_ASSERT(endpoint->http_connection_manager);
1582
1583 aws_http_connection_manager_release_connection(endpoint->http_connection_manager, connection->http_connection);
1584
1585 connection->http_connection = NULL;
1586 }
1587
1588 if (connection->request != NULL) {
1589 aws_s3_request_release(connection->request);
1590 connection->request = NULL;
1591 }
1592
1593 aws_retry_token_release(connection->retry_token);
1594 connection->retry_token = NULL;
1595
1596 aws_s3_endpoint_release(connection->endpoint);
1597 connection->endpoint = NULL;
1598
1599 aws_mem_release(client->allocator, connection);
1600 connection = NULL;
1601
1602 aws_s3_client_lock_synced_data(client);
1603 s_s3_client_schedule_process_work_synced(client);
1604 aws_s3_client_unlock_synced_data(client);
1605 }
1606
1607 static void s_s3_client_prepare_request_callback_retry_request(
1608 struct aws_s3_meta_request *meta_request,
1609 struct aws_s3_request *request,
1610 int error_code,
1611 void *user_data);
1612
s_s3_client_retry_ready(struct aws_retry_token * token,int error_code,void * user_data)1613 static void s_s3_client_retry_ready(struct aws_retry_token *token, int error_code, void *user_data) {
1614 AWS_PRECONDITION(token);
1615 (void)token;
1616
1617 struct aws_s3_connection *connection = user_data;
1618 AWS_PRECONDITION(connection);
1619
1620 struct aws_s3_request *request = connection->request;
1621 AWS_PRECONDITION(request);
1622
1623 struct aws_s3_meta_request *meta_request = request->meta_request;
1624 AWS_PRECONDITION(meta_request);
1625
1626 struct aws_s3_endpoint *endpoint = meta_request->endpoint;
1627 AWS_PRECONDITION(endpoint);
1628
1629 struct aws_s3_client *client = endpoint->user_data;
1630 AWS_PRECONDITION(client);
1631
1632 /* If we couldn't retry this request, then bail on the entire meta request. */
1633 if (error_code != AWS_ERROR_SUCCESS) {
1634
1635 AWS_LOGF_ERROR(
1636 AWS_LS_S3_CLIENT,
1637 "id=%p Client could not retry request %p for meta request %p due to error %d (%s)",
1638 (void *)client,
1639 (void *)meta_request,
1640 (void *)request,
1641 error_code,
1642 aws_error_str(error_code));
1643
1644 goto error_clean_up;
1645 }
1646
1647 AWS_LOGF_DEBUG(
1648 AWS_LS_S3_META_REQUEST,
1649 "id=%p Client retrying request %p for meta request %p on connection %p with retry token %p",
1650 (void *)client,
1651 (void *)request,
1652 (void *)meta_request,
1653 (void *)connection,
1654 (void *)connection->retry_token);
1655
1656 aws_s3_meta_request_prepare_request(
1657 meta_request, request, s_s3_client_prepare_request_callback_retry_request, connection);
1658
1659 return;
1660
1661 error_clean_up:
1662
1663 aws_s3_client_notify_connection_finished(client, connection, error_code, AWS_S3_CONNECTION_FINISH_CODE_FAILED);
1664 }
1665
s_s3_client_prepare_request_callback_retry_request(struct aws_s3_meta_request * meta_request,struct aws_s3_request * request,int error_code,void * user_data)1666 static void s_s3_client_prepare_request_callback_retry_request(
1667 struct aws_s3_meta_request *meta_request,
1668 struct aws_s3_request *request,
1669 int error_code,
1670 void *user_data) {
1671 AWS_PRECONDITION(meta_request);
1672 (void)meta_request;
1673
1674 AWS_PRECONDITION(request);
1675 (void)request;
1676
1677 struct aws_s3_connection *connection = user_data;
1678 AWS_PRECONDITION(connection);
1679
1680 struct aws_s3_endpoint *endpoint = meta_request->endpoint;
1681 AWS_ASSERT(endpoint != NULL);
1682
1683 struct aws_s3_client *client = endpoint->user_data;
1684 AWS_ASSERT(client != NULL);
1685
1686 if (error_code == AWS_ERROR_SUCCESS) {
1687 AWS_ASSERT(connection->retry_token);
1688
1689 s_s3_client_acquired_retry_token(
1690 client->retry_strategy, AWS_ERROR_SUCCESS, connection->retry_token, connection);
1691 } else {
1692 aws_s3_client_notify_connection_finished(client, connection, error_code, AWS_S3_CONNECTION_FINISH_CODE_FAILED);
1693 }
1694 }
1695
1696 /* Called by aws_s3_request when it has finished being destroyed */
aws_s3_client_notify_request_destroyed(struct aws_s3_client * client,struct aws_s3_request * request)1697 void aws_s3_client_notify_request_destroyed(struct aws_s3_client *client, struct aws_s3_request *request) {
1698 AWS_PRECONDITION(client);
1699 AWS_PRECONDITION(request);
1700
1701 if (request->tracked_by_client) {
1702 aws_s3_client_lock_synced_data(client);
1703 aws_atomic_fetch_sub(&client->stats.num_requests_in_flight, 1);
1704 s_s3_client_schedule_process_work_synced(client);
1705 aws_s3_client_unlock_synced_data(client);
1706 }
1707 }
1708