Skip to content

Commit

Permalink
fix two bugs from metrics and cancel (#399)
Browse files Browse the repository at this point in the history
Co-authored-by: Michael Graeb <[email protected]>
  • Loading branch information
TingDaoK and graebm authored Dec 22, 2023
1 parent f1ba15e commit 20097d3
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 66 deletions.
8 changes: 4 additions & 4 deletions include/aws/s3/private/s3_meta_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ struct aws_s3_meta_request {
/* True if the finish result has been set. */
uint32_t finish_result_set : 1;

/* To track the aws_s3_request that are active from HTTP level */
struct aws_linked_list ongoing_http_requests_list;
/* To track aws_s3_requests with cancellable HTTP streams */
struct aws_linked_list cancellable_http_streams_list;

} synced_data;

Expand Down Expand Up @@ -367,8 +367,8 @@ void aws_s3_meta_request_add_event_for_delivery_synced(
* The meta-request's finish callback must not be invoked until this returns false. */
bool aws_s3_meta_request_are_events_out_for_delivery_synced(struct aws_s3_meta_request *meta_request);

/* Cancel the requests with ongoing HTTP activities for the meta request */
void aws_s3_meta_request_cancel_ongoing_http_requests_synced(struct aws_s3_meta_request *meta_request, int error_code);
/* Cancel the requests with cancellable HTTP stream for the meta request */
void aws_s3_meta_request_cancel_cancellable_requests_synced(struct aws_s3_meta_request *meta_request, int error_code);

/* Asynchronously read from the meta request's input stream. Should always be done outside of any mutex,
* as reading from the stream could cause user code to call back into aws-c-s3.
Expand Down
4 changes: 2 additions & 2 deletions include/aws/s3/private/s3_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ struct aws_s3_request {
struct aws_linked_list_node node;

/* Linked list node used for tracking the request is active from HTTP level. */
struct aws_linked_list_node ongoing_http_requests_list_node;
struct aws_linked_list_node cancellable_http_streams_list_node;

/* The meta request lock must be held to access the data */
struct {
/* The underlying http stream, only valid when the request is active from HTTP level */
struct aws_http_stream *http_stream;
struct aws_http_stream *cancellable_http_stream;
} synced_data;

/* TODO Ref count on the request is no longer needed--only one part of code should ever be holding onto a request,
Expand Down
2 changes: 1 addition & 1 deletion source/s3_auto_ranged_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -1671,7 +1671,7 @@ static int s_s3_auto_ranged_put_pause(
*/
aws_s3_meta_request_set_fail_synced(meta_request, NULL, AWS_ERROR_S3_PAUSED);

aws_s3_meta_request_cancel_ongoing_http_requests_synced(meta_request, AWS_ERROR_S3_PAUSED);
aws_s3_meta_request_cancel_cancellable_requests_synced(meta_request, AWS_ERROR_S3_PAUSED);

/* unlock */
aws_s3_meta_request_unlock_synced_data(meta_request);
Expand Down
134 changes: 83 additions & 51 deletions source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ int aws_s3_meta_request_init_base(
meta_request->type = options->type;
/* Set up reference count. */
aws_ref_count_init(&meta_request->ref_count, meta_request, s_s3_meta_request_destroy);
aws_linked_list_init(&meta_request->synced_data.ongoing_http_requests_list);
aws_linked_list_init(&meta_request->synced_data.cancellable_http_streams_list);

if (part_size == SIZE_MAX) {
aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
Expand Down Expand Up @@ -346,7 +346,7 @@ void aws_s3_meta_request_cancel(struct aws_s3_meta_request *meta_request) {
/* BEGIN CRITICAL SECTION */
aws_s3_meta_request_lock_synced_data(meta_request);
aws_s3_meta_request_set_fail_synced(meta_request, NULL, AWS_ERROR_S3_CANCELED);
aws_s3_meta_request_cancel_ongoing_http_requests_synced(meta_request, AWS_ERROR_S3_CANCELED);
aws_s3_meta_request_cancel_cancellable_requests_synced(meta_request, AWS_ERROR_S3_CANCELED);
aws_s3_meta_request_unlock_synced_data(meta_request);
/* END CRITICAL SECTION */
}
Expand Down Expand Up @@ -488,7 +488,7 @@ static void s_s3_meta_request_destroy(void *user_data) {
AWS_ASSERT(aws_array_list_length(&meta_request->io_threaded_data.event_delivery_array) == 0);
aws_array_list_clean_up(&meta_request->io_threaded_data.event_delivery_array);

AWS_ASSERT(aws_linked_list_empty(&meta_request->synced_data.ongoing_http_requests_list));
AWS_ASSERT(aws_linked_list_empty(&meta_request->synced_data.cancellable_http_streams_list));

aws_s3_meta_request_result_clean_up(meta_request, &meta_request->synced_data.finish_result);

Expand Down Expand Up @@ -1071,28 +1071,51 @@ void aws_s3_meta_request_send_request(struct aws_s3_meta_request *meta_request,

AWS_LOGF_TRACE(AWS_LS_S3_META_REQUEST, "id=%p: Sending request %p", (void *)meta_request, (void *)request);

if (aws_http_stream_activate(stream) != AWS_OP_SUCCESS) {
aws_http_stream_release(stream);
stream = NULL;

AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST, "id=%p: Could not activate HTTP stream %p", (void *)meta_request, (void *)request);

goto error_finish;
}

{
if (!request->always_send) {
/* BEGIN CRITICAL SECTION */
aws_s3_meta_request_lock_synced_data(meta_request);
if (aws_s3_meta_request_has_finish_result_synced(meta_request)) {
/* The meta request has finish result already, for this request, treat it as canceled. */
aws_raise_error(AWS_ERROR_S3_CANCELED);
aws_s3_meta_request_unlock_synced_data(meta_request);
goto error_finish;
}

/* Activate the stream within the lock as once the activate invoked, the HTTP level callback can happen right
* after. */
if (aws_http_stream_activate(stream) != AWS_OP_SUCCESS) {
aws_s3_meta_request_unlock_synced_data(meta_request);
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"id=%p: Could not activate HTTP stream %p",
(void *)meta_request,
(void *)request);
goto error_finish;
}
aws_linked_list_push_back(
&meta_request->synced_data.ongoing_http_requests_list, &request->ongoing_http_requests_list_node);
request->synced_data.http_stream = stream;
&meta_request->synced_data.cancellable_http_streams_list, &request->cancellable_http_streams_list_node);
request->synced_data.cancellable_http_stream = stream;

aws_s3_meta_request_unlock_synced_data(meta_request);
/* END CRITICAL SECTION */
} else {
/* If the request always send, it is not cancellable. We simply activate the stream. */
if (aws_http_stream_activate(stream) != AWS_OP_SUCCESS) {
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"id=%p: Could not activate HTTP stream %p",
(void *)meta_request,
(void *)request);
goto error_finish;
}
}
return;

error_finish:
if (stream) {
aws_http_stream_release(stream);
stream = NULL;
}

s_s3_meta_request_send_request_finish(connection, NULL, aws_last_error_or_unknown());
}
Expand Down Expand Up @@ -1385,14 +1408,16 @@ static void s_s3_meta_request_stream_complete(struct aws_http_stream *stream, in
if (meta_request->checksum_config.validate_response_checksum) {
s_get_response_part_finish_checksum_helper(connection, error_code);
}
if (error_code != AWS_ERROR_S3_CANCELED && error_code != AWS_ERROR_S3_PAUSED) {
/* BEGIN CRITICAL SECTION */
/* BEGIN CRITICAL SECTION */
{
aws_s3_meta_request_lock_synced_data(meta_request);
AWS_ASSERT(request->synced_data.http_stream != NULL);
aws_linked_list_remove(&request->ongoing_http_requests_list_node);
if (request->synced_data.cancellable_http_stream) {
aws_linked_list_remove(&request->cancellable_http_streams_list_node);
request->synced_data.cancellable_http_stream = NULL;
}
aws_s3_meta_request_unlock_synced_data(meta_request);
/* END CRITICAL SECTION */
}
/* END CRITICAL SECTION */
s_s3_meta_request_send_request_finish(connection, stream, error_code);
}

Expand Down Expand Up @@ -1668,19 +1693,40 @@ bool aws_s3_meta_request_are_events_out_for_delivery_synced(struct aws_s3_meta_r
meta_request->synced_data.event_delivery_active;
}

void aws_s3_meta_request_cancel_ongoing_http_requests_synced(struct aws_s3_meta_request *meta_request, int error_code) {
void aws_s3_meta_request_cancel_cancellable_requests_synced(struct aws_s3_meta_request *meta_request, int error_code) {
ASSERT_SYNCED_DATA_LOCK_HELD(meta_request);
while (!aws_linked_list_empty(&meta_request->synced_data.ongoing_http_requests_list)) {
while (!aws_linked_list_empty(&meta_request->synced_data.cancellable_http_streams_list)) {
struct aws_linked_list_node *request_node =
aws_linked_list_pop_front(&meta_request->synced_data.ongoing_http_requests_list);
aws_linked_list_pop_front(&meta_request->synced_data.cancellable_http_streams_list);
struct aws_s3_request *request =
AWS_CONTAINER_OF(request_node, struct aws_s3_request, ongoing_http_requests_list_node);
if (!request->always_send) {
/* Cancel the ongoing http stream, unless it's always send. */
aws_http_stream_cancel(request->synced_data.http_stream, error_code);
AWS_CONTAINER_OF(request_node, struct aws_s3_request, cancellable_http_streams_list_node);
AWS_ASSERT(!request->always_send);

aws_http_stream_cancel(request->synced_data.cancellable_http_stream, error_code);
request->synced_data.cancellable_http_stream = NULL;
}
}

static struct aws_s3_request_metrics *s_s3_request_finish_up_and_release_metrics(
struct aws_s3_request_metrics *metrics,
struct aws_s3_meta_request *meta_request) {

if (metrics != NULL) {
/* Request is done streaming the body, complete the metrics for the request now. */

if (metrics->time_metrics.end_timestamp_ns == -1) {
aws_high_res_clock_get_ticks((uint64_t *)&metrics->time_metrics.end_timestamp_ns);
metrics->time_metrics.total_duration_ns =
metrics->time_metrics.end_timestamp_ns - metrics->time_metrics.start_timestamp_ns;
}

if (meta_request->telemetry_callback != NULL) {
/* We already in the meta request event thread, invoke the telemetry callback directly */
meta_request->telemetry_callback(meta_request, metrics, meta_request->user_data);
}
request->synced_data.http_stream = NULL;
aws_s3_request_metrics_release(metrics);
}
return NULL;
}

/* Deliver events in event_delivery_array.
Expand Down Expand Up @@ -1750,21 +1796,8 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
aws_atomic_fetch_sub(&client->stats.num_requests_streaming_response, 1);

++num_parts_delivered;

if (request->send_data.metrics != NULL) {
/* Request is done streaming the body, complete the metrics for the request now. */
struct aws_s3_request_metrics *metrics = request->send_data.metrics;
metrics->crt_info_metrics.error_code = error_code;
aws_high_res_clock_get_ticks((uint64_t *)&metrics->time_metrics.end_timestamp_ns);
metrics->time_metrics.total_duration_ns =
metrics->time_metrics.end_timestamp_ns - metrics->time_metrics.start_timestamp_ns;

if (meta_request->telemetry_callback != NULL) {
/* We already in the meta request event thread, invoke the telemetry callback directly */
meta_request->telemetry_callback(meta_request, metrics, meta_request->user_data);
}
request->send_data.metrics = aws_s3_request_metrics_release(metrics);
}
request->send_data.metrics =
s_s3_request_finish_up_and_release_metrics(request->send_data.metrics, meta_request);

aws_s3_request_release(request);
} break;
Expand Down Expand Up @@ -1804,13 +1837,8 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
AWS_FATAL_ASSERT(meta_request->telemetry_callback != NULL);
AWS_FATAL_ASSERT(metrics != NULL);

if (metrics->time_metrics.end_timestamp_ns == -1) {
aws_high_res_clock_get_ticks((uint64_t *)&metrics->time_metrics.end_timestamp_ns);
metrics->time_metrics.total_duration_ns =
metrics->time_metrics.end_timestamp_ns - metrics->time_metrics.start_timestamp_ns;
}
meta_request->telemetry_callback(meta_request, metrics, meta_request->user_data);
event.u.telemetry.metrics = aws_s3_request_metrics_release(event.u.telemetry.metrics);
event.u.telemetry.metrics =
s_s3_request_finish_up_and_release_metrics(event.u.telemetry.metrics, meta_request);
} break;

default:
Expand Down Expand Up @@ -1935,6 +1963,10 @@ void aws_s3_meta_request_finish_default(struct aws_s3_meta_request *meta_request
struct aws_linked_list_node *request_node = aws_linked_list_pop_front(&release_request_list);
struct aws_s3_request *release_request = AWS_CONTAINER_OF(request_node, struct aws_s3_request, node);
AWS_FATAL_ASSERT(release_request != NULL);
/* This pending-body-streaming request was never moved to the event-delivery queue,
* so its metrics were never finished. Finish them now. */
release_request->send_data.metrics =
s_s3_request_finish_up_and_release_metrics(release_request->send_data.metrics, meta_request);
aws_s3_request_release(release_request);
}

Expand Down
1 change: 1 addition & 0 deletions source/s3_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ int aws_s3_crt_error_code_from_server_error_code_string(struct aws_byte_cursor e
void aws_s3_request_finish_up_metrics_synced(struct aws_s3_request *request, struct aws_s3_meta_request *meta_request) {
AWS_PRECONDITION(meta_request);
AWS_PRECONDITION(request);
ASSERT_SYNCED_DATA_LOCK_HELD(meta_request);

if (request->send_data.metrics != NULL) {
/* Request is done, complete the metrics for the request now. */
Expand Down
5 changes: 3 additions & 2 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ add_net_test_case(test_s3_cancel_mpu_create_completed)
add_net_test_case(test_s3_cancel_mpu_one_part_completed)
add_net_test_case(test_s3_cancel_mpu_one_part_completed_async)
add_net_test_case(test_s3_cancel_mpu_all_parts_completed)
add_net_test_case(test_s3_cancel_mpu_ongoing_http_requests)
add_net_test_case(test_s3_pause_mpu_ongoing_http_requests)
add_net_test_case(test_s3_cancel_mpu_cancellable_requests)
add_net_test_case(test_s3_pause_mpu_cancellable_requests)
add_net_test_case(test_s3_cancel_mpd_nothing_sent)
add_net_test_case(test_s3_cancel_mpd_one_part_sent)
add_net_test_case(test_s3_cancel_mpd_one_part_completed)
Expand All @@ -59,6 +59,7 @@ add_net_test_case(test_s3_cancel_mpd_head_object_sent)
add_net_test_case(test_s3_cancel_mpd_head_object_completed)
add_net_test_case(test_s3_cancel_mpd_get_without_range_sent)
add_net_test_case(test_s3_cancel_mpd_get_without_range_completed)
add_net_test_case(test_s3_cancel_mpd_pending_streaming)
add_net_test_case(test_s3_cancel_prepare)

add_net_test_case(test_s3_get_object_tls_disabled)
Expand Down
29 changes: 23 additions & 6 deletions tests/s3_cancel_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ enum s3_update_cancel_type {
S3_UPDATE_CANCEL_TYPE_MPD_ONE_PART_SENT,
S3_UPDATE_CANCEL_TYPE_MPD_ONE_PART_COMPLETED,
S3_UPDATE_CANCEL_TYPE_MPD_TWO_PARTS_COMPLETED,
S3_UPDATE_CANCEL_TYPE_MPD_PENDING_STREAMING,
};

struct s3_cancel_test_user_data {
Expand Down Expand Up @@ -78,7 +79,7 @@ static bool s_s3_meta_request_update_cancel_test(
break;

case S3_UPDATE_CANCEL_TYPE_MPU_ONGOING_HTTP_REQUESTS:
call_cancel_or_pause = !aws_linked_list_empty(&meta_request->synced_data.ongoing_http_requests_list);
call_cancel_or_pause = !aws_linked_list_empty(&meta_request->synced_data.cancellable_http_streams_list);
break;

case S3_UPDATE_CANCEL_TYPE_NUM_MPU_CANCEL_TYPES:
Expand Down Expand Up @@ -122,6 +123,11 @@ static bool s_s3_meta_request_update_cancel_test(
/* Prevent other parts from being queued while we wait for these two to complete. */
block_update = !call_cancel_or_pause && auto_ranged_get->synced_data.num_parts_requested == 2;
break;

case S3_UPDATE_CANCEL_TYPE_MPD_PENDING_STREAMING:
call_cancel_or_pause =
aws_priority_queue_size(&meta_request->synced_data.pending_body_streaming_requests) > 0;
break;
}

aws_s3_meta_request_unlock_synced_data(meta_request);
Expand Down Expand Up @@ -299,7 +305,9 @@ static int s3_cancel_test_helper_ex(
.validate_type = AWS_S3_TESTER_VALIDATE_TYPE_EXPECT_FAILURE,
.get_options =
{
.object_path = g_pre_existing_object_1MB,
/* Note 1: 10MB object with 16KB parts, so that tests have many requests in-flight.
* We want to try and stress stuff like parts arriving out of order. */
.object_path = g_pre_existing_object_10MB,
},
};

Expand Down Expand Up @@ -527,17 +535,17 @@ static int s_test_s3_cancel_mpu_all_parts_completed(struct aws_allocator *alloca
return 0;
}

AWS_TEST_CASE(test_s3_cancel_mpu_ongoing_http_requests, s_test_s3_cancel_mpu_ongoing_http_requests)
static int s_test_s3_cancel_mpu_ongoing_http_requests(struct aws_allocator *allocator, void *ctx) {
AWS_TEST_CASE(test_s3_cancel_mpu_cancellable_requests, s_test_s3_cancel_mpu_cancellable_requests)
static int s_test_s3_cancel_mpu_cancellable_requests(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

ASSERT_SUCCESS(s3_cancel_test_helper(allocator, S3_UPDATE_CANCEL_TYPE_MPU_ONGOING_HTTP_REQUESTS));

return 0;
}

AWS_TEST_CASE(test_s3_pause_mpu_ongoing_http_requests, s_test_s3_pause_mpu_ongoing_http_requests)
static int s_test_s3_pause_mpu_ongoing_http_requests(struct aws_allocator *allocator, void *ctx) {
AWS_TEST_CASE(test_s3_pause_mpu_cancellable_requests, s_test_s3_pause_mpu_cancellable_requests)
static int s_test_s3_pause_mpu_cancellable_requests(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

ASSERT_SUCCESS(s3_cancel_test_helper_ex(
Expand Down Expand Up @@ -618,6 +626,15 @@ static int s_test_s3_cancel_mpd_get_without_range_completed(struct aws_allocator
return 0;
}

AWS_TEST_CASE(test_s3_cancel_mpd_pending_streaming, s_test_s3_cancel_mpd_pending_streaming)
static int s_test_s3_cancel_mpd_pending_streaming(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

ASSERT_SUCCESS(s3_cancel_test_helper(allocator, S3_UPDATE_CANCEL_TYPE_MPD_PENDING_STREAMING));

return 0;
}

struct test_s3_cancel_prepare_user_data {
uint32_t request_prepare_counters[AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_MAX];
};
Expand Down

0 comments on commit 20097d3

Please sign in to comment.