Skip to content

Commit

Permalink
Bypass for CreateSession reqeust (#384)
Browse files Browse the repository at this point in the history
  • Loading branch information
TingDaoK authored Nov 29, 2023
1 parent ecd0143 commit de36fee
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 40 deletions.
78 changes: 55 additions & 23 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1606,6 +1606,54 @@ static void s_s3_client_prepare_callback_queue_request(
int error_code,
void *user_data);

static bool s_s3_client_should_update_meta_request(
struct aws_s3_client *client,
struct aws_s3_meta_request *meta_request,
uint32_t num_requests_in_flight,
const uint32_t max_requests_in_flight,
const uint32_t max_requests_prepare) {

/* CreateSession has high priority to bypass the checks. */
if (meta_request->type == AWS_S3_META_REQUEST_TYPE_DEFAULT) {
struct aws_s3_meta_request_default *meta_request_default = meta_request->impl;
if (aws_string_eq_c_str(meta_request_default->operation_name, "CreateSession")) {
return true;
}
}

/**
* If number of being-prepared + already-prepared-and-queued requests is more than the max that can
* be in the preparation stage.
* Or total number of requests tracked by the client is more than the max tracked ("in flight")
* requests.
*
* We cannot create more requests for this meta request.
*/
if ((client->threaded_data.num_requests_being_prepared + client->threaded_data.request_queue_size) >=
max_requests_prepare) {
return false;
}
if (num_requests_in_flight >= max_requests_in_flight) {
return false;
}

/* If this particular endpoint doesn't have any known addresses yet, then we don't want to go full speed in
* ramping up requests just yet. If there is already enough in the queue for one address (even if those
* aren't for this particular endpoint) we skip over this meta request for now. */
struct aws_s3_endpoint *endpoint = meta_request->endpoint;
AWS_ASSERT(endpoint != NULL);
AWS_ASSERT(client->vtable->get_host_address_count);
size_t num_known_vips = client->vtable->get_host_address_count(
client->client_bootstrap->host_resolver, endpoint->host_name, AWS_GET_HOST_ADDRESS_COUNT_RECORD_TYPE_A);
if (num_known_vips == 0 && (client->threaded_data.num_requests_being_prepared +
client->threaded_data.request_queue_size) >= g_max_num_connections_per_vip) {
return false;
}

/* Nothing blocks the meta request to create more requests */
return true;
}

void aws_s3_client_update_meta_requests_threaded(struct aws_s3_client *client) {
AWS_PRECONDITION(client);

Expand All @@ -1628,37 +1676,21 @@ void aws_s3_client_update_meta_requests_threaded(struct aws_s3_client *client) {

for (uint32_t pass_index = 0; pass_index < num_passes; ++pass_index) {

/* While:
* * Number of being-prepared + already-prepared-and-queued requests is less than the max that can be in the
* preparation stage.
* * Total number of requests tracked by the client is less than the max tracked ("in flight") requests.
* * There are meta requests to get requests from.
*
* Then update meta requests to get new requests that can then be prepared (reading from any streams, signing,
* etc.) for sending.
/**
* Iterate through the meta requests to update meta requests and get new requests that can then be prepared
+ * (reading from any streams, signing, etc.) for sending.
*/
while ((client->threaded_data.num_requests_being_prepared + client->threaded_data.request_queue_size) <
max_requests_prepare &&
num_requests_in_flight < max_requests_in_flight &&
!aws_linked_list_empty(&client->threaded_data.meta_requests)) {
while (!aws_linked_list_empty(&client->threaded_data.meta_requests)) {

struct aws_linked_list_node *meta_request_node =
aws_linked_list_begin(&client->threaded_data.meta_requests);
struct aws_s3_meta_request *meta_request =
AWS_CONTAINER_OF(meta_request_node, struct aws_s3_meta_request, client_process_work_threaded_data);

struct aws_s3_endpoint *endpoint = meta_request->endpoint;
AWS_ASSERT(endpoint != NULL);

AWS_ASSERT(client->vtable->get_host_address_count);
size_t num_known_vips = client->vtable->get_host_address_count(
client->client_bootstrap->host_resolver, endpoint->host_name, AWS_GET_HOST_ADDRESS_COUNT_RECORD_TYPE_A);
if (!s_s3_client_should_update_meta_request(
client, meta_request, num_requests_in_flight, max_requests_in_flight, max_requests_prepare)) {

/* If this particular endpoint doesn't have any known addresses yet, then we don't want to go full speed in
* ramping up requests just yet. If there is already enough in the queue for one address (even if those
* aren't for this particular endpoint) we skip over this meta request for now. */
if (num_known_vips == 0 && (client->threaded_data.num_requests_being_prepared +
client->threaded_data.request_queue_size) >= g_max_num_connections_per_vip) {
/* Move the meta request to be processed from next loop. */
aws_linked_list_remove(&meta_request->client_process_work_threaded_data.node);
aws_linked_list_push_back(
&meta_requests_work_remaining, &meta_request->client_process_work_threaded_data.node);
Expand Down
1 change: 1 addition & 0 deletions source/s3express_credentials_provider.c
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ static struct aws_s3express_session_creator *s_session_creator_new(
/* Override endpoint only for tests. */
.endpoint = impl->mock_test.endpoint_override ? impl->mock_test.endpoint_override : NULL,
.user_data = session_creator,
.operation_name = aws_byte_cursor_from_c_str("CreateSession"),
};
session_creator->synced_data.meta_request = aws_s3_client_make_meta_request(impl->client, &options);
AWS_FATAL_ASSERT(session_creator->synced_data.meta_request);
Expand Down
13 changes: 8 additions & 5 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,14 @@ if(ENABLE_MOCK_SERVER_TESTS)
add_net_test_case(request_time_too_skewed_mock_server)
endif()

add_net_test_case(s3express_provider_long_run_real_server)
add_net_test_case(s3express_client_put_test_small_real_server)
add_net_test_case(s3express_client_put_test_large_real_server)
add_net_test_case(s3express_client_put_long_running_test_real_server)
add_net_test_case(s3express_client_get_test_real_server)
add_net_test_case(s3express_provider_long_running_session_refresh)

add_net_test_case(s3express_client_put_object)
add_net_test_case(s3express_client_put_object_multipart)
add_net_test_case(s3express_client_put_object_multipart_multiple)
add_net_test_case(s3express_client_put_object_long_running_session_refresh)
add_net_test_case(s3express_client_get_object)
add_net_test_case(s3express_client_get_object_multiple)

add_net_test_case(meta_request_auto_ranged_get_new_error_handling)
add_net_test_case(meta_request_auto_ranged_put_new_error_handling)
Expand Down
3 changes: 2 additions & 1 deletion tests/s3_data_plane_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -1711,7 +1711,8 @@ static int s_test_s3_multipart_put_object_with_acl(struct aws_allocator *allocat

static int s_test_s3_put_object_multiple_helper(struct aws_allocator *allocator, bool file_on_disk) {

#define NUM_REQUESTS 5
enum s_numbers { NUM_REQUESTS = 5 };

struct aws_s3_meta_request *meta_requests[NUM_REQUESTS];
struct aws_s3_meta_request_test_results meta_request_test_results[NUM_REQUESTS];
struct aws_http_message *messages[NUM_REQUESTS];
Expand Down
5 changes: 1 addition & 4 deletions tests/s3_mock_server_s3express_provider_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,6 @@ TEST_CASE(s3express_provider_stress_mock_server) {

/* Stress about under load, keep hitting 10 hosts */
for (size_t i = 0; i < num_requests; i++) {
/* code */
char key_buffer[128] = "";
snprintf(key_buffer, sizeof(key_buffer), "test-%zu", (size_t)(i % 10));
struct aws_credentials_properties_s3express property = {
Expand All @@ -562,7 +561,6 @@ TEST_CASE(s3express_provider_stress_mock_server) {
/* Stress about over load, keep hitting different hosts */
s_s3express_tester.credentials_callbacks_received = 0;
for (size_t i = 0; i < num_requests; i++) {
/* code */
char key_buffer[128] = "";
snprintf(key_buffer, sizeof(key_buffer), "test-%zu", i);
struct aws_credentials_properties_s3express property = {
Expand All @@ -583,7 +581,7 @@ TEST_CASE(s3express_provider_stress_mock_server) {
return AWS_OP_SUCCESS;
}

TEST_CASE(s3express_provider_long_run_real_server) {
TEST_CASE(s3express_provider_long_running_session_refresh) {
(void)ctx;

struct aws_s3_tester tester;
Expand Down Expand Up @@ -637,7 +635,6 @@ TEST_CASE(s3express_provider_long_run_real_server) {
}
/**
* We should have more than 2 different creds.
* Server can return a credentials that expires less than 5 mins.
**/
ASSERT_TRUE(s_s3express_tester.number_of_credentials >= 2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ static int s_s3express_put_object_request(
return AWS_OP_SUCCESS;
}

static int s_s3express_client_put_test_real_server_helper(struct aws_allocator *allocator, size_t content_length) {
static int s_s3express_client_put_test_helper(struct aws_allocator *allocator, size_t content_length) {

struct aws_s3_tester tester;
ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester));
Expand Down Expand Up @@ -256,14 +256,104 @@ static int s_s3express_client_put_test_real_server_helper(struct aws_allocator *
return AWS_OP_SUCCESS;
}

TEST_CASE(s3express_client_put_test_small_real_server) {
TEST_CASE(s3express_client_put_object) {
(void)ctx;
return s_s3express_client_put_test_real_server_helper(allocator, MB_TO_BYTES(1));
return s_s3express_client_put_test_helper(allocator, MB_TO_BYTES(1));
}

TEST_CASE(s3express_client_put_test_large_real_server) {
TEST_CASE(s3express_client_put_object_multipart) {
(void)ctx;
return s_s3express_client_put_test_real_server_helper(allocator, MB_TO_BYTES(100));
return s_s3express_client_put_test_helper(allocator, MB_TO_BYTES(100));
}

TEST_CASE(s3express_client_put_object_multipart_multiple) {
(void)ctx;

enum s_numbers { NUM_REQUESTS = 100 };

struct aws_s3_meta_request *meta_requests[NUM_REQUESTS];
struct aws_s3_meta_request_test_results meta_request_test_results[NUM_REQUESTS];
struct aws_input_stream *input_streams[NUM_REQUESTS];

struct aws_s3_tester tester;
ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester));

struct aws_byte_cursor region_cursor = aws_byte_cursor_from_c_str("us-east-1");

char endpoint[] = "crts-east1--use1-az4--x-s3.s3express-use1-az4.us-east-1.amazonaws.com";
struct aws_byte_cursor host_cursor = aws_byte_cursor_from_c_str(endpoint);
struct aws_byte_cursor key_cursor = aws_byte_cursor_from_c_str("/crt-test");

struct aws_byte_cursor west2_region_cursor = aws_byte_cursor_from_c_str("us-west-2");
char west2_endpoint[] = "crts-west2--usw2-az1--x-s3.s3express-usw2-az1.us-west-2.amazonaws.com";
struct aws_byte_cursor west2_host_cursor = aws_byte_cursor_from_c_str(west2_endpoint);

struct aws_s3_client_config client_config = {
.part_size = MB_TO_BYTES(5),
.enable_s3express = true,
.region = region_cursor,
};

ASSERT_SUCCESS(aws_s3_tester_bind_client(&tester, &client_config, AWS_S3_TESTER_BIND_CLIENT_SIGNING));

struct aws_s3_client *client = aws_s3_client_new(allocator, &client_config);

for (size_t i = 0; i < NUM_REQUESTS; ++i) {
input_streams[i] = aws_s3_test_input_stream_new(allocator, MB_TO_BYTES(10));

struct aws_byte_cursor request_region = region_cursor;
struct aws_byte_cursor request_host = host_cursor;
if (i % 2 == 0) {
/* Make half of request to east1 and rest half to west2 */
request_region = west2_region_cursor;
request_host = west2_host_cursor;
}

struct aws_http_message *message = aws_s3_test_put_object_request_new(
allocator, &request_host, key_cursor, g_test_body_content_type, input_streams[i], 0);

struct aws_s3_meta_request_options options;
AWS_ZERO_STRUCT(options);
options.type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT;
options.message = message;
struct aws_signing_config_aws s3express_signing_config = {
.algorithm = AWS_SIGNING_ALGORITHM_V4_S3EXPRESS,
.service = g_s3express_service_name,
.region = request_region,
};
options.signing_config = &s3express_signing_config;
aws_s3_meta_request_test_results_init(&meta_request_test_results[i], allocator);

ASSERT_SUCCESS(aws_s3_tester_bind_meta_request(&tester, &options, &meta_request_test_results[i]));

meta_requests[i] = aws_s3_client_make_meta_request(client, &options);
ASSERT_TRUE(meta_requests[i] != NULL);
aws_http_message_release(message);
}
/* Wait for the request to finish. */
aws_s3_tester_wait_for_meta_request_finish(&tester);
aws_s3_tester_lock_synced_data(&tester);
ASSERT_TRUE(tester.synced_data.finish_error_code == AWS_ERROR_SUCCESS);
aws_s3_tester_unlock_synced_data(&tester);

for (size_t i = 0; i < NUM_REQUESTS; ++i) {
meta_requests[i] = aws_s3_meta_request_release(meta_requests[i]);
}

aws_s3_tester_wait_for_meta_request_shutdown(&tester);

for (size_t i = 0; i < NUM_REQUESTS; ++i) {
aws_s3_tester_validate_put_object_results(&meta_request_test_results[i], 0);
aws_s3_meta_request_test_results_clean_up(&meta_request_test_results[i]);
}

for (size_t i = 0; i < NUM_REQUESTS; ++i) {
aws_input_stream_release(input_streams[i]);
}

aws_s3_client_release(client);
aws_s3_tester_clean_up(&tester);
return AWS_OP_SUCCESS;
}

void s_meta_request_finished_overhead(
Expand Down Expand Up @@ -300,7 +390,7 @@ struct aws_s3express_credentials_provider *s_s3express_provider_mock_factory(
}

/* Long running test to make sure our refresh works properly */
TEST_CASE(s3express_client_put_long_running_test_real_server) {
TEST_CASE(s3express_client_put_object_long_running_session_refresh) {
(void)ctx;

struct aws_s3_tester tester;
Expand Down Expand Up @@ -375,7 +465,7 @@ TEST_CASE(s3express_client_put_long_running_test_real_server) {
return AWS_OP_SUCCESS;
}

TEST_CASE(s3express_client_get_test_real_server) {
TEST_CASE(s3express_client_get_object) {
(void)ctx;

struct aws_s3_tester tester;
Expand Down Expand Up @@ -429,3 +519,73 @@ TEST_CASE(s3express_client_get_test_real_server) {
aws_s3_tester_clean_up(&tester);
return AWS_OP_SUCCESS;
}

TEST_CASE(s3express_client_get_object_multiple) {
(void)ctx;

struct aws_s3_meta_request *meta_requests[100];
struct aws_s3_meta_request_test_results meta_request_test_results[100];
size_t num_meta_requests = AWS_ARRAY_SIZE(meta_requests);

struct aws_s3_tester tester;
ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester));

struct aws_byte_cursor region_cursor = aws_byte_cursor_from_c_str("us-east-1");

char endpoint[] = "crts-east1--use1-az4--x-s3.s3express-use1-az4.us-east-1.amazonaws.com";
struct aws_byte_cursor host_cursor = aws_byte_cursor_from_c_str(endpoint);
struct aws_byte_cursor key_cursor = aws_byte_cursor_from_c_str("/crt-download-10MB");

struct aws_s3_client_config client_config = {
.part_size = MB_TO_BYTES(5),
.enable_s3express = true,
.region = region_cursor,
};

ASSERT_SUCCESS(aws_s3_tester_bind_client(&tester, &client_config, AWS_S3_TESTER_BIND_CLIENT_SIGNING));

struct aws_s3_client *client = aws_s3_client_new(allocator, &client_config);

for (size_t i = 0; i < num_meta_requests; ++i) {

struct aws_http_message *message = aws_s3_test_get_object_request_new(allocator, host_cursor, key_cursor);

struct aws_s3_meta_request_options options;
AWS_ZERO_STRUCT(options);
options.type = AWS_S3_META_REQUEST_TYPE_GET_OBJECT;
options.message = message;
struct aws_signing_config_aws s3express_signing_config = {
.algorithm = AWS_SIGNING_ALGORITHM_V4_S3EXPRESS,
.service = g_s3express_service_name,
};
options.signing_config = &s3express_signing_config;
aws_s3_meta_request_test_results_init(&meta_request_test_results[i], allocator);

ASSERT_SUCCESS(aws_s3_tester_bind_meta_request(&tester, &options, &meta_request_test_results[i]));

meta_requests[i] = aws_s3_client_make_meta_request(client, &options);
ASSERT_TRUE(meta_requests[i] != NULL);

aws_http_message_release(message);
}
/* Wait for the request to finish. */
aws_s3_tester_wait_for_meta_request_finish(&tester);
aws_s3_tester_lock_synced_data(&tester);
ASSERT_TRUE(tester.synced_data.finish_error_code == AWS_ERROR_SUCCESS);
aws_s3_tester_unlock_synced_data(&tester);

for (size_t i = 0; i < num_meta_requests; ++i) {
meta_requests[i] = aws_s3_meta_request_release(meta_requests[i]);
}

aws_s3_tester_wait_for_meta_request_shutdown(&tester);

for (size_t i = 0; i < num_meta_requests; ++i) {
aws_s3_tester_validate_get_object_results(&meta_request_test_results[i], 0);
aws_s3_meta_request_test_results_clean_up(&meta_request_test_results[i]);
}

aws_s3_client_release(client);
aws_s3_tester_clean_up(&tester);
return AWS_OP_SUCCESS;
}

0 comments on commit de36fee

Please sign in to comment.