Skip to content

Commit

Permalink
Fix: Hang on file upload after period of inactivity (#347)
Browse files Browse the repository at this point in the history
  • Loading branch information
TingDaoK authored Sep 11, 2023
1 parent 3a30ba8 commit 1fe3464
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 25 deletions.
5 changes: 2 additions & 3 deletions include/aws/s3/private/s3_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,8 @@ struct aws_s3_client {
/* Number of requests sitting in their meta request priority queue, waiting to be streamed. */
struct aws_atomic_var num_requests_stream_queued_waiting;

/* Number of requests currently scheduled to be streamed or are actively being streamed. */
struct aws_atomic_var num_requests_streaming;
/* Number of requests currently scheduled to be streamed the response body or are actively being streamed. */
struct aws_atomic_var num_requests_streaming_response;
} stats;

struct {
Expand Down Expand Up @@ -344,7 +344,6 @@ struct aws_s3_client {

/* Number of requests currently being prepared. */
uint32_t num_requests_being_prepared;

} threaded_data;
};

Expand Down
42 changes: 23 additions & 19 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ struct aws_s3_client *aws_s3_client_new(
}

aws_atomic_init_int(&client->stats.num_requests_stream_queued_waiting, 0);
aws_atomic_init_int(&client->stats.num_requests_streaming, 0);
aws_atomic_init_int(&client->stats.num_requests_streaming_response, 0);

*((uint32_t *)&client->max_active_connections_override) = client_config->max_active_connections_override;

Expand Down Expand Up @@ -616,6 +616,9 @@ uint32_t aws_s3_client_queue_requests_threaded(
bool queue_front) {
AWS_PRECONDITION(client);
AWS_PRECONDITION(request_list);
if (aws_linked_list_empty(request_list)) {
return 0;
}

uint32_t request_list_size = 0;

Expand Down Expand Up @@ -1279,28 +1282,33 @@ static void s_s3_client_process_work_default(struct aws_s3_client *client) {

uint32_t num_requests_stream_queued_waiting =
(uint32_t)aws_atomic_load_int(&client->stats.num_requests_stream_queued_waiting);
uint32_t num_requests_streaming = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_streaming);

uint32_t num_requests_being_prepared = client->threaded_data.num_requests_being_prepared;

uint32_t num_requests_streaming_response =
(uint32_t)aws_atomic_load_int(&client->stats.num_requests_streaming_response);

uint32_t total_approx_requests = num_requests_network_io + num_requests_stream_queued_waiting +
num_requests_streaming + client->threaded_data.num_requests_being_prepared +
num_requests_streaming_response + num_requests_being_prepared +
client->threaded_data.request_queue_size;
AWS_LOGF(
s_log_level_client_stats,
AWS_LS_S3_CLIENT_STATS,
"id=%p Requests-in-flight(approx/exact):%d/%d Requests-preparing:%d Requests-queued:%d "
"Requests-network(get/put/default/total):%d/%d/%d/%d Requests-streaming-waiting:%d Requests-streaming:%d "
"Requests-network(get/put/default/total):%d/%d/%d/%d Requests-streaming-waiting:%d "
"Requests-streaming-response:%d "
" Endpoints(in-table/allocated):%d/%d",
(void *)client,
total_approx_requests,
num_requests_tracked_requests,
client->threaded_data.num_requests_being_prepared,
num_requests_being_prepared,
client->threaded_data.request_queue_size,
num_auto_ranged_get_network_io,
num_auto_ranged_put_network_io,
num_auto_default_network_io,
num_requests_network_io,
num_requests_stream_queued_waiting,
num_requests_streaming,
num_requests_streaming_response,
num_endpoints_in_table,
num_endpoints_allocated);
}
Expand Down Expand Up @@ -1475,12 +1483,8 @@ static void s_s3_client_prepare_callback_queue_request(
struct aws_s3_client *client = user_data;
AWS_PRECONDITION(client);

bool request_is_noop = false;

if (error_code != AWS_ERROR_SUCCESS || request->is_noop) {
request_is_noop = request->is_noop != 0;
if (error_code != AWS_ERROR_SUCCESS) {
s_s3_client_meta_request_finished_request(client, meta_request, request, error_code);

request = aws_s3_request_release(request);
}

Expand All @@ -1489,9 +1493,7 @@ static void s_s3_client_prepare_callback_queue_request(
aws_s3_client_lock_synced_data(client);

if (error_code == AWS_ERROR_SUCCESS) {
if (!request_is_noop) {
aws_linked_list_push_back(&client->synced_data.prepared_requests, &request->node);
}
aws_linked_list_push_back(&client->synced_data.prepared_requests, &request->node);
} else {
++client->synced_data.num_failed_prepare_requests;
}
Expand All @@ -1515,12 +1517,14 @@ void aws_s3_client_update_connections_threaded(struct aws_s3_client *client) {

struct aws_s3_request *request = aws_s3_client_dequeue_request_threaded(client);
const uint32_t max_active_connections = aws_s3_client_get_max_active_connections(client, request->meta_request);

/* Unless the request is marked "always send", if this meta request has a finish result, then finish the request
* now and release it. */
if (!request->always_send && aws_s3_meta_request_has_finish_result(request->meta_request)) {
if (request->is_noop) {
/* If request is no-op, finishes and cleans up the request */
s_s3_client_meta_request_finished_request(client, request->meta_request, request, AWS_ERROR_SUCCESS);
request = aws_s3_request_release(request);
} else if (!request->always_send && aws_s3_meta_request_has_finish_result(request->meta_request)) {
/* Unless the request is marked "always send", if this meta request has a finish result, then finish the
* request now and release it. */
s_s3_client_meta_request_finished_request(client, request->meta_request, request, AWS_ERROR_S3_CANCELED);

request = aws_s3_request_release(request);
} else if (
s_s3_client_get_num_requests_network_io(client, request->meta_request->type) < max_active_connections) {
Expand Down
3 changes: 2 additions & 1 deletion source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -1386,7 +1386,7 @@ void aws_s3_meta_request_stream_response_body_synced(
return;
}

aws_atomic_fetch_add(&client->stats.num_requests_streaming, num_streaming_requests);
aws_atomic_fetch_add(&client->stats.num_requests_streaming_response, num_streaming_requests);
aws_atomic_fetch_sub(&client->stats.num_requests_stream_queued_waiting, num_streaming_requests);

meta_request->synced_data.num_parts_delivery_sent += num_streaming_requests;
Expand Down Expand Up @@ -1484,6 +1484,7 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
aws_error_str(error_code));
}
}
aws_atomic_fetch_sub(&client->stats.num_requests_streaming_response, 1);

++num_parts_delivered;
aws_s3_request_release(request);
Expand Down
3 changes: 2 additions & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,10 @@ add_net_test_case(test_s3_create_multipart_upload_message_with_content_md5)
add_net_test_case(test_s3_complete_multipart_message_with_content_md5)
add_net_test_case(test_s3_put_object_double_slashes)
add_net_test_case(test_s3_put_object_no_content_length)
add_net_test_case(test_s3_put_large_object_no_content_length_with_checksum)
add_net_test_case(test_s3_put_object_single_part_no_content_length)
add_net_test_case(test_s3_put_object_zero_size_no_content_length)
add_net_test_case(test_s3_put_large_object_no_content_length_with_checksum)
add_net_test_case(test_s3_put_object_no_content_length_multiple)
add_net_test_case(test_s3_put_object_async_singlepart)
add_net_test_case(test_s3_put_object_async_multipart)
add_net_test_case(test_s3_put_object_async_read_completes_synchronously)
Expand Down
56 changes: 56 additions & 0 deletions tests/s3_data_plane_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -2188,6 +2188,62 @@ static int s_test_s3_put_large_object_no_content_length_with_checksum(struct aws
return 0;
}

/**
* Once upon a time, we have a bug that without content-length, we will schedule more requests to prepare than needed.
* And those extra request will be cleaned up, however, the client level count of `num_requests_being_prepared` will
* still keep record for those.
*
* To reproduce, we create bunch of requests with less than a part body. And then sleep for a while to let dns resolve
* purge all records. (Otherwise, we will always have one valid request to be available to send.) to trigger not going
* full speed code. And we will hang.
*
*/
AWS_TEST_CASE(test_s3_put_object_no_content_length_multiple, s_test_s3_put_object_no_content_length_multiple)
static int s_test_s3_put_object_no_content_length_multiple(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

struct aws_s3_tester tester;
ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester));

struct aws_s3_client_config client_config = {
.part_size = MB_TO_BYTES(8),
};

ASSERT_SUCCESS(aws_s3_tester_bind_client(
&tester, &client_config, AWS_S3_TESTER_BIND_CLIENT_REGION | AWS_S3_TESTER_BIND_CLIENT_SIGNING));

struct aws_s3_client *client = aws_s3_client_new(allocator, &client_config);

aws_s3_set_dns_ttl(55);

ASSERT_TRUE(client != NULL);
struct aws_s3_tester_meta_request_options put_options = {
.allocator = allocator,
.meta_request_type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT,
.client = client,
.checksum_algorithm = AWS_SCA_CRC32,
.put_options =
{
.object_size_mb = 1,
.skip_content_length = true,
},
};
for (int i = 0; i < 6; i++) {
ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &put_options, NULL));
}
/* Sleep more than the DNS ttl to purge all records. */
aws_thread_current_sleep(aws_timestamp_convert(60, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL));

/* After sleep for a while, make another meta request */
ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &put_options, NULL));

aws_s3_client_release(client);

aws_s3_tester_clean_up(&tester);

return 0;
}

/* Test async-input-stream when we're not doing multipart upload */
AWS_TEST_CASE(test_s3_put_object_async_singlepart, s_test_s3_put_object_async_singlepart)
static int s_test_s3_put_object_async_singlepart(struct aws_allocator *allocator, void *ctx) {
Expand Down
14 changes: 13 additions & 1 deletion tests/s3_tester.c
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ struct aws_s3_client *aws_s3_tester_mock_client_new(struct aws_s3_tester *tester
}

aws_atomic_init_int(&mock_client->stats.num_requests_stream_queued_waiting, 0);
aws_atomic_init_int(&mock_client->stats.num_requests_streaming, 0);
aws_atomic_init_int(&mock_client->stats.num_requests_streaming_response, 0);

return mock_client;
}
Expand Down Expand Up @@ -1335,6 +1335,14 @@ int aws_s3_tester_client_new(
return AWS_OP_SUCCESS;
}

/* Disable tsan as we hack into the client threaded data */
AWS_SUPPRESS_TSAN
static int s_tester_check_client_thread_data(struct aws_s3_client *client) {
ASSERT_UINT_EQUALS(0, client->threaded_data.num_requests_being_prepared);
ASSERT_UINT_EQUALS(0, client->threaded_data.request_queue_size);
return AWS_OP_SUCCESS;
}

int aws_s3_tester_send_meta_request_with_options(
struct aws_s3_tester *tester,
struct aws_s3_tester_meta_request_options *options,
Expand Down Expand Up @@ -1689,6 +1697,10 @@ int aws_s3_tester_send_meta_request_with_options(
ASSERT_UINT_EQUALS(upload_size_bytes, out_results->progress.content_length);
}
}
ASSERT_UINT_EQUALS(0, aws_atomic_load_int(&client->stats.num_requests_in_flight));
ASSERT_UINT_EQUALS(0, aws_atomic_load_int(&client->stats.num_requests_stream_queued_waiting));
ASSERT_UINT_EQUALS(0, aws_atomic_load_int(&client->stats.num_requests_streaming_response));
ASSERT_SUCCESS(s_tester_check_client_thread_data(client));
break;
case AWS_S3_TESTER_VALIDATE_TYPE_EXPECT_FAILURE:
ASSERT_FALSE(out_results->finished_error_code == AWS_ERROR_SUCCESS);
Expand Down

0 comments on commit 1fe3464

Please sign in to comment.