diff --git a/include/aws/s3/private/s3_meta_request_impl.h b/include/aws/s3/private/s3_meta_request_impl.h index 9738eb180..d5d3fcb8e 100644 --- a/include/aws/s3/private/s3_meta_request_impl.h +++ b/include/aws/s3/private/s3_meta_request_impl.h @@ -226,8 +226,8 @@ struct aws_s3_meta_request { /* True if the finish result has been set. */ uint32_t finish_result_set : 1; - /* To track the aws_s3_request that are active from HTTP level */ - struct aws_linked_list ongoing_http_requests_list; + /* To track aws_s3_requests with cancellable HTTP streams */ + struct aws_linked_list cancellable_http_streams_list; } synced_data; @@ -367,8 +367,8 @@ void aws_s3_meta_request_add_event_for_delivery_synced( * The meta-request's finish callback must not be invoked until this returns false. */ bool aws_s3_meta_request_are_events_out_for_delivery_synced(struct aws_s3_meta_request *meta_request); -/* Cancel the requests with ongoing HTTP activities for the meta request */ -void aws_s3_meta_request_cancel_ongoing_http_requests_synced(struct aws_s3_meta_request *meta_request, int error_code); +/* Cancel the requests with cancellable HTTP stream for the meta request */ +void aws_s3_meta_request_cancel_cancellable_requests_synced(struct aws_s3_meta_request *meta_request, int error_code); /* Asynchronously read from the meta request's input stream. Should always be done outside of any mutex, * as reading from the stream could cause user code to call back into aws-c-s3. diff --git a/include/aws/s3/private/s3_request.h b/include/aws/s3/private/s3_request.h index e43996647..b77cf5231 100644 --- a/include/aws/s3/private/s3_request.h +++ b/include/aws/s3/private/s3_request.h @@ -114,12 +114,12 @@ struct aws_s3_request { struct aws_linked_list_node node; /* Linked list node used for tracking the request is active from HTTP level. */ - struct aws_linked_list_node ongoing_http_requests_list_node; + struct aws_linked_list_node cancellable_http_streams_list_node; /* The meta request lock must be held to access the data */ struct { /* The underlying http stream, only valid when the request is active from HTTP level */ - struct aws_http_stream *http_stream; + struct aws_http_stream *cancellable_http_stream; } synced_data; /* TODO Ref count on the request is no longer needed--only one part of code should ever be holding onto a request, diff --git a/source/s3_auto_ranged_put.c b/source/s3_auto_ranged_put.c index e0f7ef972..941d8ae81 100644 --- a/source/s3_auto_ranged_put.c +++ b/source/s3_auto_ranged_put.c @@ -1671,7 +1671,7 @@ static int s_s3_auto_ranged_put_pause( */ aws_s3_meta_request_set_fail_synced(meta_request, NULL, AWS_ERROR_S3_PAUSED); - aws_s3_meta_request_cancel_ongoing_http_requests_synced(meta_request, AWS_ERROR_S3_PAUSED); + aws_s3_meta_request_cancel_cancellable_requests_synced(meta_request, AWS_ERROR_S3_PAUSED); /* unlock */ aws_s3_meta_request_unlock_synced_data(meta_request); diff --git a/source/s3_meta_request.c b/source/s3_meta_request.c index 88783c27a..de6aa0055 100644 --- a/source/s3_meta_request.c +++ b/source/s3_meta_request.c @@ -203,7 +203,7 @@ int aws_s3_meta_request_init_base( meta_request->type = options->type; /* Set up reference count. */ aws_ref_count_init(&meta_request->ref_count, meta_request, s_s3_meta_request_destroy); - aws_linked_list_init(&meta_request->synced_data.ongoing_http_requests_list); + aws_linked_list_init(&meta_request->synced_data.cancellable_http_streams_list); if (part_size == SIZE_MAX) { aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); @@ -346,7 +346,7 @@ void aws_s3_meta_request_cancel(struct aws_s3_meta_request *meta_request) { /* BEGIN CRITICAL SECTION */ aws_s3_meta_request_lock_synced_data(meta_request); aws_s3_meta_request_set_fail_synced(meta_request, NULL, AWS_ERROR_S3_CANCELED); - aws_s3_meta_request_cancel_ongoing_http_requests_synced(meta_request, AWS_ERROR_S3_CANCELED); + aws_s3_meta_request_cancel_cancellable_requests_synced(meta_request, AWS_ERROR_S3_CANCELED); aws_s3_meta_request_unlock_synced_data(meta_request); /* END CRITICAL SECTION */ } @@ -488,7 +488,7 @@ static void s_s3_meta_request_destroy(void *user_data) { AWS_ASSERT(aws_array_list_length(&meta_request->io_threaded_data.event_delivery_array) == 0); aws_array_list_clean_up(&meta_request->io_threaded_data.event_delivery_array); - AWS_ASSERT(aws_linked_list_empty(&meta_request->synced_data.ongoing_http_requests_list)); + AWS_ASSERT(aws_linked_list_empty(&meta_request->synced_data.cancellable_http_streams_list)); aws_s3_meta_request_result_clean_up(meta_request, &meta_request->synced_data.finish_result); @@ -1071,28 +1071,51 @@ void aws_s3_meta_request_send_request(struct aws_s3_meta_request *meta_request, AWS_LOGF_TRACE(AWS_LS_S3_META_REQUEST, "id=%p: Sending request %p", (void *)meta_request, (void *)request); - if (aws_http_stream_activate(stream) != AWS_OP_SUCCESS) { - aws_http_stream_release(stream); - stream = NULL; - - AWS_LOGF_ERROR( - AWS_LS_S3_META_REQUEST, "id=%p: Could not activate HTTP stream %p", (void *)meta_request, (void *)request); - - goto error_finish; - } - - { + if (!request->always_send) { /* BEGIN CRITICAL SECTION */ aws_s3_meta_request_lock_synced_data(meta_request); + if (aws_s3_meta_request_has_finish_result_synced(meta_request)) { + /* The meta request has finish result already, for this request, treat it as canceled. */ + aws_raise_error(AWS_ERROR_S3_CANCELED); + aws_s3_meta_request_unlock_synced_data(meta_request); + goto error_finish; + } + + /* Activate the stream within the lock as once the activate invoked, the HTTP level callback can happen right + * after. */ + if (aws_http_stream_activate(stream) != AWS_OP_SUCCESS) { + aws_s3_meta_request_unlock_synced_data(meta_request); + AWS_LOGF_ERROR( + AWS_LS_S3_META_REQUEST, + "id=%p: Could not activate HTTP stream %p", + (void *)meta_request, + (void *)request); + goto error_finish; + } aws_linked_list_push_back( - &meta_request->synced_data.ongoing_http_requests_list, &request->ongoing_http_requests_list_node); - request->synced_data.http_stream = stream; + &meta_request->synced_data.cancellable_http_streams_list, &request->cancellable_http_streams_list_node); + request->synced_data.cancellable_http_stream = stream; + aws_s3_meta_request_unlock_synced_data(meta_request); /* END CRITICAL SECTION */ + } else { + /* If the request always send, it is not cancellable. We simply activate the stream. */ + if (aws_http_stream_activate(stream) != AWS_OP_SUCCESS) { + AWS_LOGF_ERROR( + AWS_LS_S3_META_REQUEST, + "id=%p: Could not activate HTTP stream %p", + (void *)meta_request, + (void *)request); + goto error_finish; + } } return; error_finish: + if (stream) { + aws_http_stream_release(stream); + stream = NULL; + } s_s3_meta_request_send_request_finish(connection, NULL, aws_last_error_or_unknown()); } @@ -1385,14 +1408,16 @@ static void s_s3_meta_request_stream_complete(struct aws_http_stream *stream, in if (meta_request->checksum_config.validate_response_checksum) { s_get_response_part_finish_checksum_helper(connection, error_code); } - if (error_code != AWS_ERROR_S3_CANCELED && error_code != AWS_ERROR_S3_PAUSED) { - /* BEGIN CRITICAL SECTION */ + /* BEGIN CRITICAL SECTION */ + { aws_s3_meta_request_lock_synced_data(meta_request); - AWS_ASSERT(request->synced_data.http_stream != NULL); - aws_linked_list_remove(&request->ongoing_http_requests_list_node); + if (request->synced_data.cancellable_http_stream) { + aws_linked_list_remove(&request->cancellable_http_streams_list_node); + request->synced_data.cancellable_http_stream = NULL; + } aws_s3_meta_request_unlock_synced_data(meta_request); - /* END CRITICAL SECTION */ } + /* END CRITICAL SECTION */ s_s3_meta_request_send_request_finish(connection, stream, error_code); } @@ -1668,19 +1693,40 @@ bool aws_s3_meta_request_are_events_out_for_delivery_synced(struct aws_s3_meta_r meta_request->synced_data.event_delivery_active; } -void aws_s3_meta_request_cancel_ongoing_http_requests_synced(struct aws_s3_meta_request *meta_request, int error_code) { +void aws_s3_meta_request_cancel_cancellable_requests_synced(struct aws_s3_meta_request *meta_request, int error_code) { ASSERT_SYNCED_DATA_LOCK_HELD(meta_request); - while (!aws_linked_list_empty(&meta_request->synced_data.ongoing_http_requests_list)) { + while (!aws_linked_list_empty(&meta_request->synced_data.cancellable_http_streams_list)) { struct aws_linked_list_node *request_node = - aws_linked_list_pop_front(&meta_request->synced_data.ongoing_http_requests_list); + aws_linked_list_pop_front(&meta_request->synced_data.cancellable_http_streams_list); struct aws_s3_request *request = - AWS_CONTAINER_OF(request_node, struct aws_s3_request, ongoing_http_requests_list_node); - if (!request->always_send) { - /* Cancel the ongoing http stream, unless it's always send. */ - aws_http_stream_cancel(request->synced_data.http_stream, error_code); + AWS_CONTAINER_OF(request_node, struct aws_s3_request, cancellable_http_streams_list_node); + AWS_ASSERT(!request->always_send); + + aws_http_stream_cancel(request->synced_data.cancellable_http_stream, error_code); + request->synced_data.cancellable_http_stream = NULL; + } +} + +static struct aws_s3_request_metrics *s_s3_request_finish_up_and_release_metrics( + struct aws_s3_request_metrics *metrics, + struct aws_s3_meta_request *meta_request) { + + if (metrics != NULL) { + /* Request is done streaming the body, complete the metrics for the request now. */ + + if (metrics->time_metrics.end_timestamp_ns == -1) { + aws_high_res_clock_get_ticks((uint64_t *)&metrics->time_metrics.end_timestamp_ns); + metrics->time_metrics.total_duration_ns = + metrics->time_metrics.end_timestamp_ns - metrics->time_metrics.start_timestamp_ns; + } + + if (meta_request->telemetry_callback != NULL) { + /* We already in the meta request event thread, invoke the telemetry callback directly */ + meta_request->telemetry_callback(meta_request, metrics, meta_request->user_data); } - request->synced_data.http_stream = NULL; + aws_s3_request_metrics_release(metrics); } + return NULL; } /* Deliver events in event_delivery_array. @@ -1750,21 +1796,8 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a aws_atomic_fetch_sub(&client->stats.num_requests_streaming_response, 1); ++num_parts_delivered; - - if (request->send_data.metrics != NULL) { - /* Request is done streaming the body, complete the metrics for the request now. */ - struct aws_s3_request_metrics *metrics = request->send_data.metrics; - metrics->crt_info_metrics.error_code = error_code; - aws_high_res_clock_get_ticks((uint64_t *)&metrics->time_metrics.end_timestamp_ns); - metrics->time_metrics.total_duration_ns = - metrics->time_metrics.end_timestamp_ns - metrics->time_metrics.start_timestamp_ns; - - if (meta_request->telemetry_callback != NULL) { - /* We already in the meta request event thread, invoke the telemetry callback directly */ - meta_request->telemetry_callback(meta_request, metrics, meta_request->user_data); - } - request->send_data.metrics = aws_s3_request_metrics_release(metrics); - } + request->send_data.metrics = + s_s3_request_finish_up_and_release_metrics(request->send_data.metrics, meta_request); aws_s3_request_release(request); } break; @@ -1804,13 +1837,8 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a AWS_FATAL_ASSERT(meta_request->telemetry_callback != NULL); AWS_FATAL_ASSERT(metrics != NULL); - if (metrics->time_metrics.end_timestamp_ns == -1) { - aws_high_res_clock_get_ticks((uint64_t *)&metrics->time_metrics.end_timestamp_ns); - metrics->time_metrics.total_duration_ns = - metrics->time_metrics.end_timestamp_ns - metrics->time_metrics.start_timestamp_ns; - } - meta_request->telemetry_callback(meta_request, metrics, meta_request->user_data); - event.u.telemetry.metrics = aws_s3_request_metrics_release(event.u.telemetry.metrics); + event.u.telemetry.metrics = + s_s3_request_finish_up_and_release_metrics(event.u.telemetry.metrics, meta_request); } break; default: @@ -1935,6 +1963,10 @@ void aws_s3_meta_request_finish_default(struct aws_s3_meta_request *meta_request struct aws_linked_list_node *request_node = aws_linked_list_pop_front(&release_request_list); struct aws_s3_request *release_request = AWS_CONTAINER_OF(request_node, struct aws_s3_request, node); AWS_FATAL_ASSERT(release_request != NULL); + /* This pending-body-streaming request was never moved to the event-delivery queue, + * so its metrics were never finished. Finish them now. */ + release_request->send_data.metrics = + s_s3_request_finish_up_and_release_metrics(release_request->send_data.metrics, meta_request); aws_s3_request_release(release_request); } diff --git a/source/s3_util.c b/source/s3_util.c index 1747a9524..69ac22100 100644 --- a/source/s3_util.c +++ b/source/s3_util.c @@ -636,6 +636,7 @@ int aws_s3_crt_error_code_from_server_error_code_string(struct aws_byte_cursor e void aws_s3_request_finish_up_metrics_synced(struct aws_s3_request *request, struct aws_s3_meta_request *meta_request) { AWS_PRECONDITION(meta_request); AWS_PRECONDITION(request); + ASSERT_SYNCED_DATA_LOCK_HELD(meta_request); if (request->send_data.metrics != NULL) { /* Request is done, complete the metrics for the request now. */ diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 04c8fb9b6..92b1ce6d8 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -49,8 +49,8 @@ add_net_test_case(test_s3_cancel_mpu_create_completed) add_net_test_case(test_s3_cancel_mpu_one_part_completed) add_net_test_case(test_s3_cancel_mpu_one_part_completed_async) add_net_test_case(test_s3_cancel_mpu_all_parts_completed) -add_net_test_case(test_s3_cancel_mpu_ongoing_http_requests) -add_net_test_case(test_s3_pause_mpu_ongoing_http_requests) +add_net_test_case(test_s3_cancel_mpu_cancellable_requests) +add_net_test_case(test_s3_pause_mpu_cancellable_requests) add_net_test_case(test_s3_cancel_mpd_nothing_sent) add_net_test_case(test_s3_cancel_mpd_one_part_sent) add_net_test_case(test_s3_cancel_mpd_one_part_completed) @@ -59,6 +59,7 @@ add_net_test_case(test_s3_cancel_mpd_head_object_sent) add_net_test_case(test_s3_cancel_mpd_head_object_completed) add_net_test_case(test_s3_cancel_mpd_get_without_range_sent) add_net_test_case(test_s3_cancel_mpd_get_without_range_completed) +add_net_test_case(test_s3_cancel_mpd_pending_streaming) add_net_test_case(test_s3_cancel_prepare) add_net_test_case(test_s3_get_object_tls_disabled) diff --git a/tests/s3_cancel_tests.c b/tests/s3_cancel_tests.c index c2e83f411..eb1a0ec23 100644 --- a/tests/s3_cancel_tests.c +++ b/tests/s3_cancel_tests.c @@ -30,6 +30,7 @@ enum s3_update_cancel_type { S3_UPDATE_CANCEL_TYPE_MPD_ONE_PART_SENT, S3_UPDATE_CANCEL_TYPE_MPD_ONE_PART_COMPLETED, S3_UPDATE_CANCEL_TYPE_MPD_TWO_PARTS_COMPLETED, + S3_UPDATE_CANCEL_TYPE_MPD_PENDING_STREAMING, }; struct s3_cancel_test_user_data { @@ -78,7 +79,7 @@ static bool s_s3_meta_request_update_cancel_test( break; case S3_UPDATE_CANCEL_TYPE_MPU_ONGOING_HTTP_REQUESTS: - call_cancel_or_pause = !aws_linked_list_empty(&meta_request->synced_data.ongoing_http_requests_list); + call_cancel_or_pause = !aws_linked_list_empty(&meta_request->synced_data.cancellable_http_streams_list); break; case S3_UPDATE_CANCEL_TYPE_NUM_MPU_CANCEL_TYPES: @@ -122,6 +123,11 @@ static bool s_s3_meta_request_update_cancel_test( /* Prevent other parts from being queued while we wait for these two to complete. */ block_update = !call_cancel_or_pause && auto_ranged_get->synced_data.num_parts_requested == 2; break; + + case S3_UPDATE_CANCEL_TYPE_MPD_PENDING_STREAMING: + call_cancel_or_pause = + aws_priority_queue_size(&meta_request->synced_data.pending_body_streaming_requests) > 0; + break; } aws_s3_meta_request_unlock_synced_data(meta_request); @@ -299,7 +305,9 @@ static int s3_cancel_test_helper_ex( .validate_type = AWS_S3_TESTER_VALIDATE_TYPE_EXPECT_FAILURE, .get_options = { - .object_path = g_pre_existing_object_1MB, + /* Note 1: 10MB object with 16KB parts, so that tests have many requests in-flight. + * We want to try and stress stuff like parts arriving out of order. */ + .object_path = g_pre_existing_object_10MB, }, }; @@ -527,8 +535,8 @@ static int s_test_s3_cancel_mpu_all_parts_completed(struct aws_allocator *alloca return 0; } -AWS_TEST_CASE(test_s3_cancel_mpu_ongoing_http_requests, s_test_s3_cancel_mpu_ongoing_http_requests) -static int s_test_s3_cancel_mpu_ongoing_http_requests(struct aws_allocator *allocator, void *ctx) { +AWS_TEST_CASE(test_s3_cancel_mpu_cancellable_requests, s_test_s3_cancel_mpu_cancellable_requests) +static int s_test_s3_cancel_mpu_cancellable_requests(struct aws_allocator *allocator, void *ctx) { (void)ctx; ASSERT_SUCCESS(s3_cancel_test_helper(allocator, S3_UPDATE_CANCEL_TYPE_MPU_ONGOING_HTTP_REQUESTS)); @@ -536,8 +544,8 @@ static int s_test_s3_cancel_mpu_ongoing_http_requests(struct aws_allocator *allo return 0; } -AWS_TEST_CASE(test_s3_pause_mpu_ongoing_http_requests, s_test_s3_pause_mpu_ongoing_http_requests) -static int s_test_s3_pause_mpu_ongoing_http_requests(struct aws_allocator *allocator, void *ctx) { +AWS_TEST_CASE(test_s3_pause_mpu_cancellable_requests, s_test_s3_pause_mpu_cancellable_requests) +static int s_test_s3_pause_mpu_cancellable_requests(struct aws_allocator *allocator, void *ctx) { (void)ctx; ASSERT_SUCCESS(s3_cancel_test_helper_ex( @@ -618,6 +626,15 @@ static int s_test_s3_cancel_mpd_get_without_range_completed(struct aws_allocator return 0; } +AWS_TEST_CASE(test_s3_cancel_mpd_pending_streaming, s_test_s3_cancel_mpd_pending_streaming) +static int s_test_s3_cancel_mpd_pending_streaming(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + ASSERT_SUCCESS(s3_cancel_test_helper(allocator, S3_UPDATE_CANCEL_TYPE_MPD_PENDING_STREAMING)); + + return 0; +} + struct test_s3_cancel_prepare_user_data { uint32_t request_prepare_counters[AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_MAX]; };