Skip to content

Commit

Permalink
support meta request level override of part size and mpu threshold (#393
Browse files Browse the repository at this point in the history
)

Co-authored-by: Michael Graeb <[email protected]>
  • Loading branch information
TingDaoK and graebm authored Dec 12, 2023
1 parent fcd7a10 commit bb6af37
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 30 deletions.
54 changes: 46 additions & 8 deletions include/aws/s3/s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -389,21 +389,37 @@ struct aws_s3_client_config {
*/
struct aws_signing_config_aws *signing_config;

/* Size of parts the files will be downloaded or uploaded in. */
/**
* Optional.
* Size of parts the object will be downloaded or uploaded in, in bytes.
* This only affects AWS_S3_META_REQUEST_TYPE_GET_OBJECT and AWS_S3_META_REQUEST_TYPE_PUT_OBJECT.
* If not set, this defaults to 8 MiB.
* The client will adjust the part size for AWS_S3_META_REQUEST_TYPE_PUT_OBJECT if needed for service limits (max
* number of parts per upload is 10,000, minimum upload part size is 5 MiB).
*
* You can also set this per meta-request, via `aws_s3_meta_request_options.part_size`.
*/
uint64_t part_size;

/* If the part size needs to be adjusted for service limits, this is the maximum size it will be adjusted to. On 32
* bit machine, it will be forced to SIZE_MAX, which is around 4GiB. The server limit is 5GiB, but object size limit
* is 5TiB for now. We should be good enough for all the cases. */
uint64_t max_part_size;

/* The size threshold in bytes for when to use multipart uploads for a AWS_S3_META_REQUEST_TYPE_PUT_OBJECT meta
* request. Uploads over this size will automatically use a multipart upload strategy,while uploads smaller or
* equal to this threshold will use a single request to upload the whole object. If not set, `part_size` will be
* used as threshold. */
/**
* Optional.
* The size threshold in bytes for when to use multipart uploads.
* Uploads larger than this will use the multipart upload strategy.
* Uploads smaller or equal to this will use a single HTTP request.
* This only affects AWS_S3_META_REQUEST_TYPE_PUT_OBJECT.
* If set, this should be at least `part_size`.
* If not set, maximal of `part_size` and 5 MiB will be used.
*
* You can also set this per meta-request, via `aws_s3_meta_request_options.multipart_upload_threshold`.
*/
uint64_t multipart_upload_threshold;

/* Throughput target in Gbps that we are trying to reach. */
/* Throughput target in gigabits per second (Gbps) that we are trying to reach. */
double throughput_target_gbps;

/* How much memory can we use. */
Expand Down Expand Up @@ -554,8 +570,6 @@ struct aws_s3_checksum_config {
* 3) If the data will be be produced in asynchronous chunks, set `send_async_stream`.
*/
struct aws_s3_meta_request_options {
/* TODO: The meta request options cannot control the request to be split or not. Should consider to add one */

/* The type of meta request we will be trying to accelerate. */
enum aws_s3_meta_request_type type;

Expand Down Expand Up @@ -612,6 +626,30 @@ struct aws_s3_meta_request_options {
*/
const struct aws_s3_checksum_config *checksum_config;

/**
* Optional.
* Size of parts the object will be downloaded or uploaded in, in bytes.
* This only affects AWS_S3_META_REQUEST_TYPE_GET_OBJECT and AWS_S3_META_REQUEST_TYPE_PUT_OBJECT.
* If not set, the value from `aws_s3_client_config.part_size` is used, which defaults to 8MiB.
*
* The client will adjust the part size for AWS_S3_META_REQUEST_TYPE_PUT_OBJECT if needed for service limits (max
* number of parts per upload is 10,000, minimum upload part size is 5 MiB).
*/
uint64_t part_size;

/**
* Optional.
* The size threshold in bytes for when to use multipart uploads.
* Uploads larger than this will use the multipart upload strategy.
* Uploads smaller or equal to this will use a single HTTP request.
* This only affects AWS_S3_META_REQUEST_TYPE_PUT_OBJECT.
* If set, this should be at least `part_size`.
* If not set, `part_size` adjusted by client will be used as the threshold.
* If both `part_size` and `multipart_upload_threshold` are not set,
* the values from `aws_s3_client_config` are used.
*/
uint64_t multipart_upload_threshold;

/* User data for all callbacks. */
void *user_data;

Expand Down
70 changes: 48 additions & 22 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,13 @@ struct aws_s3_client *aws_s3_client_new(
mem_limit = client_config->memory_limit_in_bytes;
}

size_t part_size;
size_t part_size = s_default_part_size;
if (client_config->part_size != 0) {
part_size = (size_t)client_config->part_size;
} else {
part_size = s_default_part_size;
if (client_config->part_size > SIZE_MAX) {
part_size = SIZE_MAX;
} else {
part_size = (size_t)client_config->part_size;
}
}

client->buffer_pool = aws_s3_buffer_pool_new(allocator, part_size, mem_limit);
Expand Down Expand Up @@ -424,6 +426,9 @@ struct aws_s3_client *aws_s3_client_new(

if (client_config->multipart_upload_threshold != 0) {
*((uint64_t *)&client->multipart_upload_threshold) = client_config->multipart_upload_threshold;
} else {
*((uint64_t *)&client->multipart_upload_threshold) =
part_size > g_s3_min_upload_part_size ? part_size : g_s3_min_upload_part_size;
}

if (client_config->max_part_size < client_config->part_size) {
Expand Down Expand Up @@ -1151,6 +1156,14 @@ static struct aws_s3_meta_request *s_s3_client_meta_request_factory_default(
aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
return NULL;
}
size_t part_size = client->part_size;
if (options->part_size != 0) {
if (options->part_size > SIZE_MAX) {
part_size = SIZE_MAX;
} else {
part_size = (size_t)options->part_size;
}
}

/* Call the appropriate meta-request new function. */
switch (options->type) {
Expand All @@ -1169,7 +1182,7 @@ static struct aws_s3_meta_request *s_s3_client_meta_request_factory_default(
options);
}

return aws_s3_meta_request_auto_ranged_get_new(client->allocator, client, client->part_size, options);
return aws_s3_meta_request_auto_ranged_get_new(client->allocator, client, part_size, options);
}
case AWS_S3_META_REQUEST_TYPE_PUT_OBJECT: {
if (body_source_count == 0) {
Expand All @@ -1182,19 +1195,17 @@ static struct aws_s3_meta_request *s_s3_client_meta_request_factory_default(
}

if (options->resume_token == NULL) {

size_t client_part_size = client->part_size;
uint64_t client_max_part_size = client->max_part_size;

if (client_part_size < g_s3_min_upload_part_size) {
if (part_size < g_s3_min_upload_part_size) {
AWS_LOGF_WARN(
AWS_LS_S3_META_REQUEST,
"Client config part size of %" PRIu64 " is less than the minimum upload part size of %" PRIu64
"Config part size of %" PRIu64 " is less than the minimum upload part size of %" PRIu64
". Using to the minimum part-size for upload.",
(uint64_t)client_part_size,
(uint64_t)part_size,
(uint64_t)g_s3_min_upload_part_size);

client_part_size = g_s3_min_upload_part_size;
part_size = g_s3_min_upload_part_size;
}

if (client_max_part_size < (uint64_t)g_s3_min_upload_part_size) {
Expand All @@ -1208,8 +1219,32 @@ static struct aws_s3_meta_request *s_s3_client_meta_request_factory_default(

client_max_part_size = (uint64_t)g_s3_min_upload_part_size;
}
uint64_t multipart_upload_threshold =
client->multipart_upload_threshold == 0 ? client_part_size : client->multipart_upload_threshold;

uint32_t num_parts = 0;
if (content_length_found) {
size_t out_part_size = 0;
if (aws_s3_calculate_optimal_mpu_part_size_and_num_parts(
content_length, part_size, client_max_part_size, &out_part_size, &num_parts)) {
return NULL;
}
part_size = out_part_size;
}
if (part_size != options->part_size && part_size != client->part_size) {
AWS_LOGF_DEBUG(
AWS_LS_S3_META_REQUEST,
"The multipart upload part size has been adjusted to %" PRIu64 "",
(uint64_t)part_size);
}

/* Default to client level setting */
uint64_t multipart_upload_threshold = client->multipart_upload_threshold;
if (options->multipart_upload_threshold != 0) {
/* If the threshold is set for the meta request, use it */
multipart_upload_threshold = options->multipart_upload_threshold;
} else if (options->part_size != 0) {
/* If the threshold is not set, but the part size is set for the meta request, use it */
multipart_upload_threshold = part_size;
}

if (content_length_found && content_length <= multipart_upload_threshold) {
return aws_s3_meta_request_default_new(
Expand All @@ -1233,15 +1268,6 @@ static struct aws_s3_meta_request *s_s3_client_meta_request_factory_default(
}
}

size_t part_size = client_part_size;
uint32_t num_parts = 0;
if (content_length_found) {
if (aws_s3_calculate_optimal_mpu_part_size_and_num_parts(
content_length, client_part_size, client_max_part_size, &part_size, &num_parts)) {
return NULL;
}
}

return aws_s3_meta_request_auto_ranged_put_new(
client->allocator, client, part_size, content_length_found, content_length, num_parts, options);
} else { /* else using resume token */
Expand Down
6 changes: 6 additions & 0 deletions source/s3_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,12 @@ int aws_s3_calculate_optimal_mpu_part_size_and_num_parts(
AWS_FATAL_ASSERT(out_part_size);
AWS_FATAL_ASSERT(out_num_parts);

if (content_length == 0) {
*out_part_size = 0;
*out_num_parts = 0;
return AWS_OP_SUCCESS;
}

uint64_t part_size_uint64 = content_length / (uint64_t)g_s3_max_num_upload_parts;

if ((content_length % g_s3_max_num_upload_parts) > 0) {
Expand Down
2 changes: 2 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ add_test_case(test_s3_buffer_pool_too_small)
add_net_test_case(test_s3_put_object_buffer_pool_trim)

add_net_test_case(client_update_upload_part_timeout)
add_net_test_case(client_meta_request_override_part_size)
add_net_test_case(client_meta_request_override_multipart_upload_threshold)

set(TEST_BINARY_NAME ${PROJECT_NAME}-tests)
generate_test_driver(${TEST_BINARY_NAME})
Expand Down
141 changes: 141 additions & 0 deletions tests/s3_client_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,144 @@ TEST_CASE(client_update_upload_part_timeout) {
aws_s3_tester_clean_up(&tester);
return AWS_OP_SUCCESS;
}

/* Test meta request can override the part size as expected */
TEST_CASE(client_meta_request_override_part_size) {
(void)ctx;
struct aws_s3_tester tester;
AWS_ZERO_STRUCT(tester);
ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester));
struct aws_s3_client *client = NULL;
struct aws_s3_tester_client_options client_options = {
.part_size = MB_TO_BYTES(8),
.tls_usage = AWS_S3_TLS_DISABLED,
};
ASSERT_SUCCESS(aws_s3_tester_client_new(&tester, &client_options, &client));

struct aws_string *host_name =
aws_s3_tester_build_endpoint_string(allocator, &g_test_bucket_name, &g_test_s3_region);
struct aws_byte_cursor host_cur = aws_byte_cursor_from_string(host_name);
struct aws_byte_cursor test_object_path = aws_byte_cursor_from_c_str("/mytest");

size_t override_part_size = MB_TO_BYTES(10);
size_t content_length =
MB_TO_BYTES(20); /* Let the content length larger than the override part size to make sure we do MPU */

/* MPU put object */
struct aws_input_stream_tester_options stream_options = {
.autogen_length = content_length,
};
struct aws_input_stream *input_stream = aws_input_stream_new_tester(allocator, &stream_options);

struct aws_http_message *put_messages = aws_s3_test_put_object_request_new(
allocator, &host_cur, g_test_body_content_type, test_object_path, input_stream, 0 /*flags*/);

struct aws_s3_meta_request_options meta_request_options = {
.message = put_messages,
.type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT,
.part_size = override_part_size,
};
struct aws_s3_meta_request *put_meta_request = client->vtable->meta_request_factory(client, &meta_request_options);
ASSERT_UINT_EQUALS(put_meta_request->part_size, override_part_size);

/* auto ranged Get Object */
struct aws_http_message *get_message = aws_s3_test_get_object_request_new(
allocator, aws_byte_cursor_from_string(host_name), g_pre_existing_object_1MB);

struct aws_s3_meta_request_options get_meta_request_options = {
.message = get_message,
.type = AWS_S3_META_REQUEST_TYPE_GET_OBJECT,
.part_size = override_part_size,
};

struct aws_s3_meta_request *get_meta_request =
client->vtable->meta_request_factory(client, &get_meta_request_options);
ASSERT_UINT_EQUALS(get_meta_request->part_size, override_part_size);

aws_http_message_release(put_messages);
aws_s3_meta_request_release(put_meta_request);
aws_http_message_release(get_message);
aws_s3_meta_request_release(get_meta_request);
aws_string_destroy(host_name);
aws_s3_client_release(client);
aws_input_stream_release(input_stream);
aws_s3_tester_clean_up(&tester);

return AWS_OP_SUCCESS;
}

/* Test meta request can override the multipart upload threshold as expected */
TEST_CASE(client_meta_request_override_multipart_upload_threshold) {
(void)ctx;
struct aws_s3_tester tester;
ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester));

struct aws_s3_client_config client_config = {
.part_size = MB_TO_BYTES(8),
.multipart_upload_threshold = MB_TO_BYTES(15),
};

ASSERT_SUCCESS(aws_s3_tester_bind_client(
&tester, &client_config, AWS_S3_TESTER_BIND_CLIENT_REGION | AWS_S3_TESTER_BIND_CLIENT_SIGNING));

struct aws_s3_client *client = aws_s3_client_new(allocator, &client_config);

ASSERT_TRUE(client != NULL);

struct aws_string *host_name =
aws_s3_tester_build_endpoint_string(allocator, &g_test_bucket_name, &g_test_s3_region);
struct aws_byte_cursor host_cur = aws_byte_cursor_from_string(host_name);
struct aws_byte_cursor test_object_path = aws_byte_cursor_from_c_str("/mytest");

size_t override_multipart_upload_threshold = MB_TO_BYTES(20);
size_t content_length =
MB_TO_BYTES(20); /* Let the content length larger than the override part size to make sure we do MPU */

/* MPU put object */
struct aws_input_stream_tester_options stream_options = {
.autogen_length = content_length,
};
struct aws_input_stream *input_stream = aws_input_stream_new_tester(allocator, &stream_options);

struct aws_http_message *put_messages = aws_s3_test_put_object_request_new(
allocator, &host_cur, g_test_body_content_type, test_object_path, input_stream, 0 /*flags*/);

{
/* Content length is smaller than the override multipart_upload_threshold */
struct aws_s3_meta_request_options meta_request_options = {
.message = put_messages,
.type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT,
.multipart_upload_threshold = override_multipart_upload_threshold,
};
struct aws_s3_meta_request *put_meta_request =
client->vtable->meta_request_factory(client, &meta_request_options);

/* Part size will be 0, as we don't use MPU */
ASSERT_UINT_EQUALS(put_meta_request->part_size, 0);
aws_s3_meta_request_release(put_meta_request);
}

{
/* meta request override the part size, so the override part size will be used as the multipart upload threshold
*/
struct aws_s3_meta_request_options meta_request_options = {
.message = put_messages,
.type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT,
.part_size = override_multipart_upload_threshold,
};
struct aws_s3_meta_request *put_meta_request =
client->vtable->meta_request_factory(client, &meta_request_options);

/* Part size will be 0, as we don't use MPU */
ASSERT_UINT_EQUALS(put_meta_request->part_size, 0);
aws_s3_meta_request_release(put_meta_request);
}

aws_http_message_release(put_messages);
aws_string_destroy(host_name);
aws_s3_client_release(client);
aws_input_stream_release(input_stream);
aws_s3_tester_clean_up(&tester);

return AWS_OP_SUCCESS;
}

0 comments on commit bb6af37

Please sign in to comment.