diff --git a/include/aws/s3/private/s3_client_impl.h b/include/aws/s3/private/s3_client_impl.h index 634c12133..14aec8c6c 100644 --- a/include/aws/s3/private/s3_client_impl.h +++ b/include/aws/s3/private/s3_client_impl.h @@ -283,8 +283,8 @@ struct aws_s3_client { /* Number of requests sitting in their meta request priority queue, waiting to be streamed. */ struct aws_atomic_var num_requests_stream_queued_waiting; - /* Number of requests currently scheduled to be streamed or are actively being streamed. */ - struct aws_atomic_var num_requests_streaming; + /* Number of requests currently scheduled to be streamed the response body or are actively being streamed. */ + struct aws_atomic_var num_requests_streaming_response; } stats; struct { @@ -344,7 +344,6 @@ struct aws_s3_client { /* Number of requests currently being prepared. */ uint32_t num_requests_being_prepared; - } threaded_data; }; diff --git a/source/s3_client.c b/source/s3_client.c index abe9b6e44..81653fba4 100644 --- a/source/s3_client.c +++ b/source/s3_client.c @@ -276,7 +276,7 @@ struct aws_s3_client *aws_s3_client_new( } aws_atomic_init_int(&client->stats.num_requests_stream_queued_waiting, 0); - aws_atomic_init_int(&client->stats.num_requests_streaming, 0); + aws_atomic_init_int(&client->stats.num_requests_streaming_response, 0); *((uint32_t *)&client->max_active_connections_override) = client_config->max_active_connections_override; @@ -616,6 +616,9 @@ uint32_t aws_s3_client_queue_requests_threaded( bool queue_front) { AWS_PRECONDITION(client); AWS_PRECONDITION(request_list); + if (aws_linked_list_empty(request_list)) { + return 0; + } uint32_t request_list_size = 0; @@ -1279,28 +1282,33 @@ static void s_s3_client_process_work_default(struct aws_s3_client *client) { uint32_t num_requests_stream_queued_waiting = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_stream_queued_waiting); - uint32_t num_requests_streaming = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_streaming); + + uint32_t num_requests_being_prepared = client->threaded_data.num_requests_being_prepared; + + uint32_t num_requests_streaming_response = + (uint32_t)aws_atomic_load_int(&client->stats.num_requests_streaming_response); uint32_t total_approx_requests = num_requests_network_io + num_requests_stream_queued_waiting + - num_requests_streaming + client->threaded_data.num_requests_being_prepared + + num_requests_streaming_response + num_requests_being_prepared + client->threaded_data.request_queue_size; AWS_LOGF( s_log_level_client_stats, AWS_LS_S3_CLIENT_STATS, "id=%p Requests-in-flight(approx/exact):%d/%d Requests-preparing:%d Requests-queued:%d " - "Requests-network(get/put/default/total):%d/%d/%d/%d Requests-streaming-waiting:%d Requests-streaming:%d " + "Requests-network(get/put/default/total):%d/%d/%d/%d Requests-streaming-waiting:%d " + "Requests-streaming-response:%d " " Endpoints(in-table/allocated):%d/%d", (void *)client, total_approx_requests, num_requests_tracked_requests, - client->threaded_data.num_requests_being_prepared, + num_requests_being_prepared, client->threaded_data.request_queue_size, num_auto_ranged_get_network_io, num_auto_ranged_put_network_io, num_auto_default_network_io, num_requests_network_io, num_requests_stream_queued_waiting, - num_requests_streaming, + num_requests_streaming_response, num_endpoints_in_table, num_endpoints_allocated); } @@ -1475,12 +1483,8 @@ static void s_s3_client_prepare_callback_queue_request( struct aws_s3_client *client = user_data; AWS_PRECONDITION(client); - bool request_is_noop = false; - - if (error_code != AWS_ERROR_SUCCESS || request->is_noop) { - request_is_noop = request->is_noop != 0; + if (error_code != AWS_ERROR_SUCCESS) { s_s3_client_meta_request_finished_request(client, meta_request, request, error_code); - request = aws_s3_request_release(request); } @@ -1489,9 +1493,7 @@ static void s_s3_client_prepare_callback_queue_request( aws_s3_client_lock_synced_data(client); if (error_code == AWS_ERROR_SUCCESS) { - if (!request_is_noop) { - aws_linked_list_push_back(&client->synced_data.prepared_requests, &request->node); - } + aws_linked_list_push_back(&client->synced_data.prepared_requests, &request->node); } else { ++client->synced_data.num_failed_prepare_requests; } @@ -1515,12 +1517,14 @@ void aws_s3_client_update_connections_threaded(struct aws_s3_client *client) { struct aws_s3_request *request = aws_s3_client_dequeue_request_threaded(client); const uint32_t max_active_connections = aws_s3_client_get_max_active_connections(client, request->meta_request); - - /* Unless the request is marked "always send", if this meta request has a finish result, then finish the request - * now and release it. */ - if (!request->always_send && aws_s3_meta_request_has_finish_result(request->meta_request)) { + if (request->is_noop) { + /* If request is no-op, finishes and cleans up the request */ + s_s3_client_meta_request_finished_request(client, request->meta_request, request, AWS_ERROR_SUCCESS); + request = aws_s3_request_release(request); + } else if (!request->always_send && aws_s3_meta_request_has_finish_result(request->meta_request)) { + /* Unless the request is marked "always send", if this meta request has a finish result, then finish the + * request now and release it. */ s_s3_client_meta_request_finished_request(client, request->meta_request, request, AWS_ERROR_S3_CANCELED); - request = aws_s3_request_release(request); } else if ( s_s3_client_get_num_requests_network_io(client, request->meta_request->type) < max_active_connections) { diff --git a/source/s3_meta_request.c b/source/s3_meta_request.c index f2216c59f..1cf7f737f 100644 --- a/source/s3_meta_request.c +++ b/source/s3_meta_request.c @@ -1386,7 +1386,7 @@ void aws_s3_meta_request_stream_response_body_synced( return; } - aws_atomic_fetch_add(&client->stats.num_requests_streaming, num_streaming_requests); + aws_atomic_fetch_add(&client->stats.num_requests_streaming_response, num_streaming_requests); aws_atomic_fetch_sub(&client->stats.num_requests_stream_queued_waiting, num_streaming_requests); meta_request->synced_data.num_parts_delivery_sent += num_streaming_requests; @@ -1484,6 +1484,7 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a aws_error_str(error_code)); } } + aws_atomic_fetch_sub(&client->stats.num_requests_streaming_response, 1); ++num_parts_delivered; aws_s3_request_release(request); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index e4137e7a3..45efcd227 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -108,9 +108,10 @@ add_net_test_case(test_s3_create_multipart_upload_message_with_content_md5) add_net_test_case(test_s3_complete_multipart_message_with_content_md5) add_net_test_case(test_s3_put_object_double_slashes) add_net_test_case(test_s3_put_object_no_content_length) -add_net_test_case(test_s3_put_large_object_no_content_length_with_checksum) add_net_test_case(test_s3_put_object_single_part_no_content_length) add_net_test_case(test_s3_put_object_zero_size_no_content_length) +add_net_test_case(test_s3_put_large_object_no_content_length_with_checksum) +add_net_test_case(test_s3_put_object_no_content_length_multiple) add_net_test_case(test_s3_put_object_async_singlepart) add_net_test_case(test_s3_put_object_async_multipart) add_net_test_case(test_s3_put_object_async_read_completes_synchronously) diff --git a/tests/s3_data_plane_tests.c b/tests/s3_data_plane_tests.c index 2d168ed72..11ae454ee 100644 --- a/tests/s3_data_plane_tests.c +++ b/tests/s3_data_plane_tests.c @@ -2188,6 +2188,62 @@ static int s_test_s3_put_large_object_no_content_length_with_checksum(struct aws return 0; } +/** + * Once upon a time, we have a bug that without content-length, we will schedule more requests to prepare than needed. + * And those extra request will be cleaned up, however, the client level count of `num_requests_being_prepared` will + * still keep record for those. + * + * To reproduce, we create bunch of requests with less than a part body. And then sleep for a while to let dns resolve + * purge all records. (Otherwise, we will always have one valid request to be available to send.) to trigger not going + * full speed code. And we will hang. + * + */ +AWS_TEST_CASE(test_s3_put_object_no_content_length_multiple, s_test_s3_put_object_no_content_length_multiple) +static int s_test_s3_put_object_no_content_length_multiple(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct aws_s3_tester tester; + ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester)); + + struct aws_s3_client_config client_config = { + .part_size = MB_TO_BYTES(8), + }; + + ASSERT_SUCCESS(aws_s3_tester_bind_client( + &tester, &client_config, AWS_S3_TESTER_BIND_CLIENT_REGION | AWS_S3_TESTER_BIND_CLIENT_SIGNING)); + + struct aws_s3_client *client = aws_s3_client_new(allocator, &client_config); + + aws_s3_set_dns_ttl(55); + + ASSERT_TRUE(client != NULL); + struct aws_s3_tester_meta_request_options put_options = { + .allocator = allocator, + .meta_request_type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT, + .client = client, + .checksum_algorithm = AWS_SCA_CRC32, + .put_options = + { + .object_size_mb = 1, + .skip_content_length = true, + }, + }; + for (int i = 0; i < 6; i++) { + ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &put_options, NULL)); + } + /* Sleep more than the DNS ttl to purge all records. */ + aws_thread_current_sleep(aws_timestamp_convert(60, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL)); + + /* After sleep for a while, make another meta request */ + ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &put_options, NULL)); + + aws_s3_client_release(client); + + aws_s3_tester_clean_up(&tester); + + return 0; +} + /* Test async-input-stream when we're not doing multipart upload */ AWS_TEST_CASE(test_s3_put_object_async_singlepart, s_test_s3_put_object_async_singlepart) static int s_test_s3_put_object_async_singlepart(struct aws_allocator *allocator, void *ctx) { diff --git a/tests/s3_tester.c b/tests/s3_tester.c index 2ccf3d70e..76d1044e5 100644 --- a/tests/s3_tester.c +++ b/tests/s3_tester.c @@ -810,7 +810,7 @@ struct aws_s3_client *aws_s3_tester_mock_client_new(struct aws_s3_tester *tester } aws_atomic_init_int(&mock_client->stats.num_requests_stream_queued_waiting, 0); - aws_atomic_init_int(&mock_client->stats.num_requests_streaming, 0); + aws_atomic_init_int(&mock_client->stats.num_requests_streaming_response, 0); return mock_client; } @@ -1335,6 +1335,14 @@ int aws_s3_tester_client_new( return AWS_OP_SUCCESS; } +/* Disable tsan as we hack into the client threaded data */ +AWS_SUPPRESS_TSAN +static int s_tester_check_client_thread_data(struct aws_s3_client *client) { + ASSERT_UINT_EQUALS(0, client->threaded_data.num_requests_being_prepared); + ASSERT_UINT_EQUALS(0, client->threaded_data.request_queue_size); + return AWS_OP_SUCCESS; +} + int aws_s3_tester_send_meta_request_with_options( struct aws_s3_tester *tester, struct aws_s3_tester_meta_request_options *options, @@ -1689,6 +1697,10 @@ int aws_s3_tester_send_meta_request_with_options( ASSERT_UINT_EQUALS(upload_size_bytes, out_results->progress.content_length); } } + ASSERT_UINT_EQUALS(0, aws_atomic_load_int(&client->stats.num_requests_in_flight)); + ASSERT_UINT_EQUALS(0, aws_atomic_load_int(&client->stats.num_requests_stream_queued_waiting)); + ASSERT_UINT_EQUALS(0, aws_atomic_load_int(&client->stats.num_requests_streaming_response)); + ASSERT_SUCCESS(s_tester_check_client_thread_data(client)); break; case AWS_S3_TESTER_VALIDATE_TYPE_EXPECT_FAILURE: ASSERT_FALSE(out_results->finished_error_code == AWS_ERROR_SUCCESS);