Skip to content

Commit

Permalink
Fix for abort multipart upload message not being sent (#114)
Browse files Browse the repository at this point in the history
  • Loading branch information
rccarper authored Apr 16, 2021
1 parent 2159aa8 commit cab7afd
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 30 deletions.
10 changes: 7 additions & 3 deletions include/aws/s3/private/s3_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ struct aws_http_message;
struct aws_signable;
struct aws_s3_meta_request;

enum aws_s3_request_desc_flags {
AWS_S3_REQUEST_DESC_RECORD_RESPONSE_HEADERS = 0x00000001,
AWS_S3_REQUEST_DESC_PART_SIZE_RESPONSE_BODY = 0x0000002,
enum aws_s3_request_flags {
AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS = 0x00000001,
AWS_S3_REQUEST_FLAG_PART_SIZE_RESPONSE_BODY = 0x00000002,
AWS_S3_REQUEST_FLAG_ALWAYS_SEND = 0x00000004,
};

/* Represents a single request made to S3. */
Expand Down Expand Up @@ -86,6 +87,9 @@ struct aws_s3_request {

/* When true, this request is being tracked by the client for limiting the amount of in-flight-requests/stats. */
uint32_t tracked_by_client : 1;

/* When true, even when the meta request has a finish result set, this request will be sent. */
uint32_t always_send : 1;
};

AWS_EXTERN_C_BEGIN
Expand Down
6 changes: 3 additions & 3 deletions source/s3_auto_ranged_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ static bool s_s3_auto_ranged_get_update(
meta_request,
AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_PART,
1,
AWS_S3_REQUEST_DESC_RECORD_RESPONSE_HEADERS | AWS_S3_REQUEST_DESC_PART_SIZE_RESPONSE_BODY);
AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS | AWS_S3_REQUEST_FLAG_PART_SIZE_RESPONSE_BODY);

++auto_ranged_get->synced_data.num_parts_requested;
goto has_work_remaining;
Expand All @@ -201,7 +201,7 @@ static bool s_s3_auto_ranged_get_update(
meta_request,
AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_PART_WITHOUT_RANGE,
0,
AWS_S3_REQUEST_DESC_RECORD_RESPONSE_HEADERS);
AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS);

auto_ranged_get->synced_data.get_without_range_sent = true;
goto has_work_remaining;
Expand All @@ -218,7 +218,7 @@ static bool s_s3_auto_ranged_get_update(
meta_request,
AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_PART,
auto_ranged_get->synced_data.num_parts_requested + 1,
AWS_S3_REQUEST_DESC_PART_SIZE_RESPONSE_BODY);
AWS_S3_REQUEST_FLAG_PART_SIZE_RESPONSE_BODY);

++auto_ranged_get->synced_data.num_parts_requested;
goto has_work_remaining;
Expand Down
16 changes: 8 additions & 8 deletions source/s3_auto_ranged_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ static bool s_s3_auto_ranged_put_update(
meta_request,
AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_CREATE_MULTIPART_UPLOAD,
0,
AWS_S3_REQUEST_DESC_RECORD_RESPONSE_HEADERS);
AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS);

auto_ranged_put->synced_data.create_multipart_upload_sent = true;

Expand Down Expand Up @@ -187,7 +187,7 @@ static bool s_s3_auto_ranged_put_update(

/* Allocate a request for another part. */
request = aws_s3_request_new(
meta_request, AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_PART, 0, AWS_S3_REQUEST_DESC_RECORD_RESPONSE_HEADERS);
meta_request, AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_PART, 0, AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS);

request->part_number = auto_ranged_put->threaded_update_data.next_part_number;

Expand Down Expand Up @@ -221,7 +221,7 @@ static bool s_s3_auto_ranged_put_update(
meta_request,
AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_COMPLETE_MULTIPART_UPLOAD,
0,
AWS_S3_REQUEST_DESC_RECORD_RESPONSE_HEADERS);
AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS);

auto_ranged_put->synced_data.complete_multipart_upload_sent = true;

Expand Down Expand Up @@ -285,7 +285,7 @@ static bool s_s3_auto_ranged_put_update(
meta_request,
AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_ABORT_MULTIPART_UPLOAD,
0,
AWS_S3_REQUEST_DESC_RECORD_RESPONSE_HEADERS);
AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS | AWS_S3_REQUEST_FLAG_ALWAYS_SEND);

auto_ranged_put->synced_data.abort_multipart_upload_sent = true;

Expand Down Expand Up @@ -557,17 +557,17 @@ static void s_s3_auto_ranged_put_request_finished(
}
}

aws_s3_meta_request_lock_synced_data(meta_request);

++auto_ranged_put->synced_data.num_parts_completed;

AWS_LOGF_DEBUG(
AWS_LS_S3_META_REQUEST,
"id=%p: %d out of %d parts have completed.",
(void *)meta_request,
auto_ranged_put->synced_data.num_parts_completed,
auto_ranged_put->synced_data.total_num_parts);

aws_s3_meta_request_lock_synced_data(meta_request);

++auto_ranged_put->synced_data.num_parts_completed;

if (error_code == AWS_ERROR_SUCCESS) {
AWS_ASSERT(etag != NULL);

Expand Down
5 changes: 3 additions & 2 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1676,8 +1676,9 @@ void aws_s3_client_update_connections_threaded(struct aws_s3_client *client, boo
uint32_t num_conns_per_vip = s_num_conns_per_vip_meta_request_look_up[request->meta_request->type];
const uint32_t max_active_connections = aws_s3_client_get_max_active_connections(client, num_conns_per_vip);

/* If this meta request was finished, then let the meta request know and release the request. */
if (aws_s3_meta_request_has_finish_result(request->meta_request)) {
/* Unless the request is marked "always send", if this meta request has a finish result, then finish the request
* now and release it. */
if (!request->always_send && aws_s3_meta_request_has_finish_result(request->meta_request)) {
/* Put the unused connection at the front of the list so that it is used in the next iteration.*/
aws_linked_list_push_front(&client->threaded_data.idle_vip_connections, &vip_connection->node);

Expand Down
2 changes: 1 addition & 1 deletion source/s3_default_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ static bool s_s3_meta_request_default_update(
goto has_work_remaining;
}

request = aws_s3_request_new(meta_request, 0, 1, AWS_S3_REQUEST_DESC_RECORD_RESPONSE_HEADERS);
request = aws_s3_request_new(meta_request, 0, 1, AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS);

AWS_LOGF_DEBUG(
AWS_LS_S3_META_REQUEST,
Expand Down
5 changes: 3 additions & 2 deletions source/s3_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ struct aws_s3_request *aws_s3_request_new(

request->request_tag = request_tag;
request->part_number = part_number;
request->record_response_headers = (flags & AWS_S3_REQUEST_DESC_RECORD_RESPONSE_HEADERS) != 0;
request->part_size_response_body = (flags & AWS_S3_REQUEST_DESC_PART_SIZE_RESPONSE_BODY) != 0;
request->record_response_headers = (flags & AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS) != 0;
request->part_size_response_body = (flags & AWS_S3_REQUEST_FLAG_PART_SIZE_RESPONSE_BODY) != 0;
request->always_send = (flags & AWS_S3_REQUEST_FLAG_ALWAYS_SEND) != 0;

return request;
}
Expand Down
36 changes: 32 additions & 4 deletions tests/s3_cancel_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ enum s3_update_cancel_type {

struct s3_cancel_test_user_data {
enum s3_update_cancel_type type;
bool abort_successful;
};

static bool s_s3_update_cancel_test(
static bool s_s3_meta_request_update_cancel_test(
struct aws_s3_meta_request *meta_request,
uint32_t flags,
struct aws_s3_request **out_request) {
Expand All @@ -44,6 +45,7 @@ static bool s_s3_update_cancel_test(
bool block_update = false;

aws_s3_meta_request_lock_synced_data(meta_request);

switch (cancel_test_user_data->type) {
case S3_UPDATE_CANCEL_TYPE_MPU_CREATE_NOT_SENT:
call_cancel = auto_ranged_put->synced_data.create_multipart_upload_sent != 0;
Expand Down Expand Up @@ -89,6 +91,28 @@ static bool s_s3_update_cancel_test(
return original_meta_request_vtable->update(meta_request, flags, out_request);
}

static void s_s3_meta_request_finished_request_cancel_test(
struct aws_s3_meta_request *meta_request,
struct aws_s3_request *request,
int error_code) {
AWS_ASSERT(meta_request);
AWS_ASSERT(request);

struct aws_s3_meta_request_test_results *results = meta_request->user_data;
struct aws_s3_tester *tester = results->tester;
struct s3_cancel_test_user_data *cancel_test_user_data = tester->user_data;

if (meta_request->type == AWS_S3_META_REQUEST_TYPE_PUT_OBJECT &&
request->request_tag == AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_ABORT_MULTIPART_UPLOAD) {
cancel_test_user_data->abort_successful = error_code == AWS_ERROR_SUCCESS;
}

struct aws_s3_meta_request_vtable *original_meta_request_vtable =
aws_s3_tester_get_meta_request_vtable_patch(tester, 0)->original_vtable;

original_meta_request_vtable->finished_request(meta_request, request, error_code);
}

static struct aws_s3_meta_request *s_meta_request_factory_patch_update_cancel_test(
struct aws_s3_client *client,
const struct aws_s3_meta_request_options *options) {
Expand All @@ -104,7 +128,8 @@ static struct aws_s3_meta_request *s_meta_request_factory_patch_update_cancel_te

struct aws_s3_meta_request_vtable *patched_meta_request_vtable =
aws_s3_tester_patch_meta_request_vtable(tester, meta_request, NULL);
patched_meta_request_vtable->update = s_s3_update_cancel_test;
patched_meta_request_vtable->update = s_s3_meta_request_update_cancel_test;
patched_meta_request_vtable->finished_request = s_s3_meta_request_finished_request_cancel_test;

return meta_request;
}
Expand Down Expand Up @@ -151,8 +176,11 @@ static int s3_cancel_test_helper(struct aws_allocator *allocator, enum s3_update

aws_s3_meta_request_test_results_clean_up(&meta_request_test_results);

/* TODO: we need list-multipart-uploads to validate the cancel succeed, for now, we are using CLI to manually
* ensure the abort succeed */
if (cancel_type != S3_UPDATE_CANCEL_TYPE_MPU_CREATE_NOT_SENT) {
ASSERT_TRUE(test_user_data.abort_successful);
}

/* TODO: perform additional verification with list-multipart-uploads */

} else {

Expand Down
39 changes: 32 additions & 7 deletions tests/s3_data_plane_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ static int s_test_s3_request_create_destroy(struct aws_allocator *allocator, voi
ASSERT_TRUE(request_message != NULL);

struct aws_s3_request *request =
aws_s3_request_new(meta_request, request_tag, part_number, AWS_S3_REQUEST_DESC_RECORD_RESPONSE_HEADERS);
aws_s3_request_new(meta_request, request_tag, part_number, AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS);

ASSERT_TRUE(request != NULL);

Expand Down Expand Up @@ -1001,7 +1001,7 @@ static void s_s3_test_meta_request_has_finish_result_finished_request(
++user_data->call_counter;
}

/* Test that the client will discard requests for meta requests that are trying to finish. */
/* Test that the client will correctly discard requests for meta requests that are trying to finish. */
AWS_TEST_CASE(test_s3_client_update_connections_finish_result, s_test_s3_client_update_connections_finish_result)
static int s_test_s3_client_update_connections_finish_result(struct aws_allocator *allocator, void *ctx) {
(void)ctx;
Expand All @@ -1026,7 +1026,6 @@ static int s_test_s3_client_update_connections_finish_result(struct aws_allocato
aws_s3_tester_patch_meta_request_vtable(&tester, mock_meta_request, NULL);
mock_meta_request_vtable->finished_request = s_s3_test_meta_request_has_finish_result_finished_request;

struct aws_s3_request *request = aws_s3_request_new(mock_meta_request, 0, 0, 0);
struct aws_s3_client *mock_client = aws_s3_tester_mock_client_new(&tester);

*((uint32_t *)&mock_client->ideal_vip_count) = 1;
Expand All @@ -1035,11 +1034,12 @@ static int s_test_s3_client_update_connections_finish_result(struct aws_allocato

aws_linked_list_init(&mock_client->threaded_data.request_queue);

aws_linked_list_push_back(&mock_client->threaded_data.request_queue, &request->node);
++mock_client->threaded_data.request_queue_size;

/* Because the meta request is finishing, the request should be released. */
/* Verify that the request does not get sent. */
{
struct aws_s3_request *request = aws_s3_request_new(mock_meta_request, 0, 0, 0);
aws_linked_list_push_back(&mock_client->threaded_data.request_queue, &request->node);
++mock_client->threaded_data.request_queue_size;

aws_s3_client_update_connections_threaded(mock_client, true);

ASSERT_TRUE(mock_client->threaded_data.request_queue_size == 0);
Expand All @@ -1052,6 +1052,31 @@ static int s_test_s3_client_update_connections_finish_result(struct aws_allocato
ASSERT_FALSE(vip_connection->is_warm);
ASSERT_FALSE(vip_connection->is_active);
ASSERT_TRUE(vip_connection->request == NULL);

test_update_connections_finish_result_user_data.request = 0;
test_update_connections_finish_result_user_data.call_counter = 0;
}

/* Verify that the request still gets sent because it has the 'always send' flag. */
{
struct aws_s3_request *request = aws_s3_request_new(mock_meta_request, 0, 0, AWS_S3_REQUEST_FLAG_ALWAYS_SEND);
aws_linked_list_push_back(&mock_client->threaded_data.request_queue, &request->node);
++mock_client->threaded_data.request_queue_size;

aws_s3_client_update_connections_threaded(mock_client, true);

ASSERT_TRUE(mock_client->threaded_data.request_queue_size == 0);
ASSERT_TRUE(aws_linked_list_empty(&mock_client->threaded_data.request_queue));
ASSERT_TRUE(aws_atomic_load_int(&mock_client->stats.num_active_vip_connections) == 1);

ASSERT_TRUE(test_update_connections_finish_result_user_data.request == NULL);
ASSERT_TRUE(test_update_connections_finish_result_user_data.call_counter == 0);

ASSERT_TRUE(vip_connection->is_warm);
ASSERT_TRUE(vip_connection->is_active);
ASSERT_TRUE(vip_connection->request == request);

aws_s3_request_release(request);
}

aws_s3_meta_request_release(mock_meta_request);
Expand Down

0 comments on commit cab7afd

Please sign in to comment.